This is an automated email from the ASF dual-hosted git repository.

pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new d87353432b Port akka PRs #31891, #31892, #31893: tags in 
EventEnvelope, EventsByPersistenceIdTyped queries, withTaggerForState (#2920)
d87353432b is described below

commit d87353432b1e4b558538f7467e0d5590dfdca784
Author: PJ Fanning <[email protected]>
AuthorDate: Mon Apr 27 12:51:36 2026 +0200

    Port akka PRs #31891, #31892, #31893: tags in EventEnvelope, 
EventsByPersistenceIdTyped queries, withTaggerForState (#2920)
    
    * Port akka PRs #31891, #31892, #31893: tags in EventEnvelope, 
EventsByPersistenceIdTyped queries, withTaggerForState
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko/sessions/a96f8dd0-0966-4aea-86f6-9892c68ee5c1
    
    Co-authored-by: pjfanning <[email protected]>
    
    * Fix imports: use scala.jdk.CollectionConverters instead of 
pekko.util.ccompat.JavaConverters
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko/sessions/a96f8dd0-0966-4aea-86f6-9892c68ee5c1
    
    Co-authored-by: pjfanning <[email protected]>
    
    * scalafmt
    
    * javafmt
    
    * Fix PersistenceProbeImpl: call tagger(state, event) after signature 
change to (S,E)=>Set[String]
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko/sessions/80c7afcd-d6ae-4b03-83b6-207aedfdd8cd
    
    Co-authored-by: pjfanning <[email protected]>
    
    * Update typed-event-envelope-tags.excludes
    
    ---------
    
    Co-authored-by: copilot-swe-agent[bot] 
<[email protected]>
    Co-authored-by: pjfanning <[email protected]>
---
 .../query/internal/protobuf/QueryMessages.java     | 241 ++++++++++++++++++++-
 .../typed-event-envelope-tags.excludes             |  27 +++
 .../src/main/protobuf/QueryMessages.proto          |   1 +
 .../query/internal/QuerySerializer.scala           |  14 +-
 .../persistence/query/typed/EventEnvelope.scala    |  82 ++++++-
 .../CurrentEventsByPersistenceIdTypedQuery.scala   |  41 ++++
 .../javadsl/EventsByPersistenceIdTypedQuery.scala  |  48 ++++
 .../CurrentEventsByPersistenceIdTypedQuery.scala   |  41 ++++
 .../scaladsl/EventsByPersistenceIdTypedQuery.scala |  48 ++++
 .../query/internal/QuerySerializerSpec.scala       |  16 ++
 .../testkit/internal/PersistenceProbeImpl.scala    |   4 +-
 .../internal/TypedEventsByPersistenceIdStage.scala | 116 ++++++++++
 .../javadsl/PersistenceTestKitReadJournal.scala    |  23 +-
 .../scaladsl/PersistenceTestKitReadJournal.scala   |  64 +++++-
 .../testkit/query/CurrentEventsBySlicesSpec.scala  |  22 ++
 .../testkit/query/EventsByPersistenceIdSpec.scala  |  23 +-
 .../query/EventsByPersistenceIdTypedSpec.scala     | 114 ++++++++++
 .../typed/scaladsl/EventSourcedBehaviorSpec.scala  |  22 ++
 .../scaladsl/EventSourcedBehaviorWatchSpec.scala   |   2 +-
 .../tags-for-state.excludes                        |  22 ++
 .../persistence/typed/internal/BehaviorSetup.scala |   2 +-
 .../typed/internal/EventSourcedBehaviorImpl.scala  |   5 +-
 .../pekko/persistence/typed/internal/Running.scala |  11 +-
 .../typed/javadsl/EventSourcedBehavior.scala       |  18 +-
 .../typed/scaladsl/EventSourcedBehavior.scala      |   7 +
 25 files changed, 983 insertions(+), 31 deletions(-)

diff --git 
a/persistence-query/src/main/java/org/apache/pekko/persistence/query/internal/protobuf/QueryMessages.java
 
b/persistence-query/src/main/java/org/apache/pekko/persistence/query/internal/protobuf/QueryMessages.java
index bc26bed3e5..32ffcaee5e 100644
--- 
a/persistence-query/src/main/java/org/apache/pekko/persistence/query/internal/protobuf/QueryMessages.java
+++ 
b/persistence-query/src/main/java/org/apache/pekko/persistence/query/internal/protobuf/QueryMessages.java
@@ -239,6 +239,36 @@ public final class QueryMessages extends 
org.apache.pekko.protobufv3.internal.Ge
      * @return The bytes for source.
      */
     org.apache.pekko.protobufv3.internal.ByteString getSourceBytes();
+
+    /**
+     * <code>repeated string tags = 12;</code>
+     *
+     * @return A list containing the tags.
+     */
+    java.util.List<java.lang.String> getTagsList();
+
+    /**
+     * <code>repeated string tags = 12;</code>
+     *
+     * @return The count of tags.
+     */
+    int getTagsCount();
+
+    /**
+     * <code>repeated string tags = 12;</code>
+     *
+     * @param index The index of the element to return.
+     * @return The tags at the given index.
+     */
+    java.lang.String getTags(int index);
+
+    /**
+     * <code>repeated string tags = 12;</code>
+     *
+     * @param index The index of the value to return.
+     * @return The bytes of the tags at the given index.
+     */
+    org.apache.pekko.protobufv3.internal.ByteString getTagsBytes(int index);
   }
 
   /**
@@ -279,6 +309,7 @@ public final class QueryMessages extends 
org.apache.pekko.protobufv3.internal.Ge
       offset_ = "";
       offsetManifest_ = "";
       source_ = "";
+      tags_ = org.apache.pekko.protobufv3.internal.LazyStringArrayList.EMPTY;
     }
 
     public static final 
org.apache.pekko.protobufv3.internal.Descriptors.Descriptor
@@ -734,6 +765,47 @@ public final class QueryMessages extends 
org.apache.pekko.protobufv3.internal.Ge
       }
     }
 
+    public static final int TAGS_FIELD_NUMBER = 12;
+    private org.apache.pekko.protobufv3.internal.LazyStringList tags_;
+
+    /**
+     * <code>repeated string tags = 12;</code>
+     *
+     * @return A list containing the tags.
+     */
+    public org.apache.pekko.protobufv3.internal.ProtocolStringList 
getTagsList() {
+      return tags_;
+    }
+
+    /**
+     * <code>repeated string tags = 12;</code>
+     *
+     * @return The count of tags.
+     */
+    public int getTagsCount() {
+      return tags_.size();
+    }
+
+    /**
+     * <code>repeated string tags = 12;</code>
+     *
+     * @param index The index of the element to return.
+     * @return The tags at the given index.
+     */
+    public java.lang.String getTags(int index) {
+      return tags_.get(index);
+    }
+
+    /**
+     * <code>repeated string tags = 12;</code>
+     *
+     * @param index The index of the value to return.
+     * @return The bytes of the tags at the given index.
+     */
+    public org.apache.pekko.protobufv3.internal.ByteString getTagsBytes(int 
index) {
+      return tags_.getByteString(index);
+    }
+
     private byte memoizedIsInitialized = -1;
 
     @java.lang.Override
@@ -824,6 +896,10 @@ public final class QueryMessages extends 
org.apache.pekko.protobufv3.internal.Ge
       if (((bitField0_ & 0x00000400) != 0)) {
         
org.apache.pekko.protobufv3.internal.GeneratedMessage.writeString(output, 11, 
source_);
       }
+      for (int i = 0; i < tags_.size(); i++) {
+        org.apache.pekko.protobufv3.internal.GeneratedMessage.writeString(
+            output, 12, tags_.getRaw(i));
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -879,6 +955,14 @@ public final class QueryMessages extends 
org.apache.pekko.protobufv3.internal.Ge
         size +=
             
org.apache.pekko.protobufv3.internal.GeneratedMessage.computeStringSize(11, 
source_);
       }
+      {
+        int dataSize = 0;
+        for (int i = 0; i < tags_.size(); i++) {
+          dataSize += computeStringSizeNoTag(tags_.getRaw(i));
+        }
+        size += dataSize;
+        size += 1 * getTagsList().size();
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSize = size;
       return size;
@@ -941,6 +1025,7 @@ public final class QueryMessages extends 
org.apache.pekko.protobufv3.internal.Ge
       if (hasSource()) {
         if (!getSource().equals(other.getSource())) return false;
       }
+      if (!getTagsList().equals(other.getTagsList())) return false;
       if (!getUnknownFields().equals(other.getUnknownFields())) return false;
       return true;
     }
@@ -998,6 +1083,10 @@ public final class QueryMessages extends 
org.apache.pekko.protobufv3.internal.Ge
         hash = (37 * hash) + SOURCE_FIELD_NUMBER;
         hash = (53 * hash) + getSource().hashCode();
       }
+      if (getTagsCount() > 0) {
+        hash = (37 * hash) + TAGS_FIELD_NUMBER;
+        hash = (53 * hash) + getTagsList().hashCode();
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -1191,6 +1280,8 @@ public final class QueryMessages extends 
org.apache.pekko.protobufv3.internal.Ge
         }
         filtered_ = false;
         source_ = "";
+        tags_ = org.apache.pekko.protobufv3.internal.LazyStringArrayList.EMPTY;
+        bitField0_ = (bitField0_ & ~0x00000800);
         return this;
       }
 
@@ -1279,6 +1370,11 @@ public final class QueryMessages extends 
org.apache.pekko.protobufv3.internal.Ge
           result.source_ = source_;
           to_bitField0_ |= 0x00000400;
         }
+        if (((from_bitField0_ & 0x00000800) != 0)) {
+          tags_ = tags_.getUnmodifiableView();
+          bitField0_ = (bitField0_ & ~0x00000800);
+        }
+        result.tags_ = tags_;
         result.bitField0_ |= to_bitField0_;
       }
 
@@ -1344,6 +1440,16 @@ public final class QueryMessages extends 
org.apache.pekko.protobufv3.internal.Ge
           bitField0_ |= 0x00000400;
           onChanged();
         }
+        if (!other.tags_.isEmpty()) {
+          if (tags_.isEmpty()) {
+            tags_ = other.tags_;
+            bitField0_ = (bitField0_ & ~0x00000800);
+          } else {
+            ensureTagsIsMutable();
+            tags_.addAll(other.tags_);
+          }
+          onChanged();
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         onChanged();
         return this;
@@ -1468,6 +1574,13 @@ public final class QueryMessages extends 
org.apache.pekko.protobufv3.internal.Ge
                   bitField0_ |= 0x00000400;
                   break;
                 } // case 90
+              case 98:
+                {
+                  java.lang.String s = input.readStringRequireUtf8();
+                  ensureTagsIsMutable();
+                  tags_.add(s);
+                  break;
+                } // case 98
               default:
                 {
                   if (!super.parseUnknownField(input, extensionRegistry, tag)) 
{
@@ -2394,6 +2507,128 @@ public final class QueryMessages extends 
org.apache.pekko.protobufv3.internal.Ge
         return this;
       }
 
+      private org.apache.pekko.protobufv3.internal.LazyStringList tags_ =
+          org.apache.pekko.protobufv3.internal.LazyStringArrayList.EMPTY;
+
+      private void ensureTagsIsMutable() {
+        if (!((bitField0_ & 0x00000800) != 0)) {
+          tags_ = new 
org.apache.pekko.protobufv3.internal.LazyStringArrayList(tags_);
+          bitField0_ |= 0x00000800;
+        }
+      }
+
+      /**
+       * <code>repeated string tags = 12;</code>
+       *
+       * @return A list containing the tags.
+       */
+      public org.apache.pekko.protobufv3.internal.ProtocolStringList 
getTagsList() {
+        return tags_.getUnmodifiableView();
+      }
+
+      /**
+       * <code>repeated string tags = 12;</code>
+       *
+       * @return The count of tags.
+       */
+      public int getTagsCount() {
+        return tags_.size();
+      }
+
+      /**
+       * <code>repeated string tags = 12;</code>
+       *
+       * @param index The index of the element to return.
+       * @return The tags at the given index.
+       */
+      public java.lang.String getTags(int index) {
+        return tags_.get(index);
+      }
+
+      /**
+       * <code>repeated string tags = 12;</code>
+       *
+       * @param index The index of the value to return.
+       * @return The bytes of the tags at the given index.
+       */
+      public org.apache.pekko.protobufv3.internal.ByteString getTagsBytes(int 
index) {
+        return tags_.getByteString(index);
+      }
+
+      /**
+       * <code>repeated string tags = 12;</code>
+       *
+       * @param index The index to set the value at.
+       * @param value The tags to set.
+       * @return This builder for chaining.
+       */
+      public Builder setTags(int index, java.lang.String value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        ensureTagsIsMutable();
+        tags_.set(index, value);
+        onChanged();
+        return this;
+      }
+
+      /**
+       * <code>repeated string tags = 12;</code>
+       *
+       * @param value The tags to add.
+       * @return This builder for chaining.
+       */
+      public Builder addTags(java.lang.String value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        ensureTagsIsMutable();
+        tags_.add(value);
+        onChanged();
+        return this;
+      }
+
+      /**
+       * <code>repeated string tags = 12;</code>
+       *
+       * @param values The tags to add.
+       * @return This builder for chaining.
+       */
+      public Builder addAllTags(java.lang.Iterable<java.lang.String> values) {
+        ensureTagsIsMutable();
+        
org.apache.pekko.protobufv3.internal.AbstractMessageLite.Builder.addAll(values, 
tags_);
+        onChanged();
+        return this;
+      }
+
+      /**
+       * <code>repeated string tags = 12;</code>
+       *
+       * @return This builder for chaining.
+       */
+      public Builder clearTags() {
+        tags_ = org.apache.pekko.protobufv3.internal.LazyStringArrayList.EMPTY;
+        bitField0_ = (bitField0_ & ~0x00000800);
+        onChanged();
+        return this;
+      }
+
+      /**
+       * <code>repeated string tags = 12;</code>
+       *
+       * @param value The bytes of the tags to add.
+       * @return This builder for chaining.
+       */
+      public Builder 
addTagsBytes(org.apache.pekko.protobufv3.internal.ByteString value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        ensureTagsIsMutable();
+        tags_.add(value);
+        onChanged();
+        return this;
+      }
+
       // 
@@protoc_insertion_point(builder_scope:org.apache.pekko.persistence.query.EventEnvelope)
     }
 
@@ -2466,7 +2701,7 @@ public final class QueryMessages extends 
org.apache.pekko.protobufv3.internal.Ge
     java.lang.String[] descriptorData = {
       "\n"
           + "\023QueryMessages.proto\022\"org.apache.pekko."
-          + "persistence.query\032\026ContainerFormats.proto\"\363\001\n\r"
+          + "persistence.query\032\026ContainerFormats.proto\"\201\002\n\r"
           + "EventEnvelope\022\026\n"
           + "\016persistence_id\030\001 \002(\t\022\023\n"
           + "\013entity_type\030\002 \002(\t\022\r\n"
@@ -2479,7 +2714,8 @@ public final class QueryMessages extends 
org.apache.pekko.protobufv3.internal.Ge
           + "\010metadata\030\t \001(\0132\010.Payload\022\020\n"
           + "\010filtered\030\n"
           + " \001(\010\022\016\n"
-          + "\006source\030\013 \001(\tB8\n"
+          + "\006source\030\013 \001(\t\022\014\n"
+          + "\004tags\030\014 \003(\tB8\n"
           + "4org.apache.pekko.persistence.query.internal.protobufH\001"
     };
     descriptor =
@@ -2506,6 +2742,7 @@ public final class QueryMessages extends 
org.apache.pekko.protobufv3.internal.Ge
               "Metadata",
               "Filtered",
               "Source",
+              "Tags",
             });
     descriptor.resolveAllFeaturesImmutable();
     org.apache.pekko.remote.ContainerFormats.getDescriptor();
diff --git 
a/persistence-query/src/main/mima-filters/2.0.x.backwards.excludes/typed-event-envelope-tags.excludes
 
b/persistence-query/src/main/mima-filters/2.0.x.backwards.excludes/typed-event-envelope-tags.excludes
new file mode 100644
index 0000000000..dc01137b4c
--- /dev/null
+++ 
b/persistence-query/src/main/mima-filters/2.0.x.backwards.excludes/typed-event-envelope-tags.excludes
@@ -0,0 +1,27 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Internal protobuf classes
+ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.persistence.query.internal.protobuf.QueryMessages#EventEnvelopeOrBuilder.getTags")
+ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.persistence.query.internal.protobuf.QueryMessages#EventEnvelopeOrBuilder.getTagsList")
+ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.persistence.query.internal.protobuf.QueryMessages#EventEnvelopeOrBuilder.getTagsCount")
+ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.persistence.query.internal.protobuf.QueryMessages#EventEnvelopeOrBuilder.getTagsBytes")
+
+# New constructors and methods in typed EventEnvelope
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.persistence.query.typed.EventEnvelope.this")
+ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.persistence.query.typed.EventEnvelope.apply")
+ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.persistence.query.typed.EventEnvelope.create")
diff --git a/persistence-query/src/main/protobuf/QueryMessages.proto 
b/persistence-query/src/main/protobuf/QueryMessages.proto
index f3c3d192f7..02a3e66e65 100644
--- a/persistence-query/src/main/protobuf/QueryMessages.proto
+++ b/persistence-query/src/main/protobuf/QueryMessages.proto
@@ -32,4 +32,5 @@ message EventEnvelope {
   optional Payload metadata = 9;
   optional bool filtered = 10;
   optional string source = 11;
+  repeated string tags = 12;
 }
diff --git 
a/persistence-query/src/main/scala/org/apache/pekko/persistence/query/internal/QuerySerializer.scala
 
b/persistence-query/src/main/scala/org/apache/pekko/persistence/query/internal/QuerySerializer.scala
index 6e0538bf27..25ffdeabc1 100644
--- 
a/persistence-query/src/main/scala/org/apache/pekko/persistence/query/internal/QuerySerializer.scala
+++ 
b/persistence-query/src/main/scala/org/apache/pekko/persistence/query/internal/QuerySerializer.scala
@@ -91,6 +91,11 @@ import pekko.serialization.Serializers
       if (env.source.nonEmpty)
         builder.setSource(env.source)
 
+      if (env.tags.nonEmpty) {
+        import scala.jdk.CollectionConverters._
+        builder.addAllTags(env.tags.asJava)
+      }
+
       builder.build().toByteArray()
 
     case offset: Offset =>
@@ -115,6 +120,12 @@ import pekko.serialization.Serializers
 
       val filtered = env.hasFiltered && env.getFiltered
       val source = if (env.hasSource) env.getSource else ""
+      val tags =
+        if (env.getTagsList.isEmpty) Set.empty[String]
+        else {
+          import scala.jdk.CollectionConverters._
+          env.getTagsList.iterator.asScala.toSet
+        }
 
       new EventEnvelope(
         offset,
@@ -126,7 +137,8 @@ import pekko.serialization.Serializers
         env.getEntityType,
         env.getSlice,
         filtered,
-        source)
+        source,
+        tags)
 
     case _ =>
       fromStorageRepresentation(new String(bytes, UTF_8), manifest)
diff --git 
a/persistence-query/src/main/scala/org/apache/pekko/persistence/query/typed/EventEnvelope.scala
 
b/persistence-query/src/main/scala/org/apache/pekko/persistence/query/typed/EventEnvelope.scala
index 551d46526d..23cdbea46d 100644
--- 
a/persistence-query/src/main/scala/org/apache/pekko/persistence/query/typed/EventEnvelope.scala
+++ 
b/persistence-query/src/main/scala/org/apache/pekko/persistence/query/typed/EventEnvelope.scala
@@ -14,6 +14,7 @@
 package org.apache.pekko.persistence.query.typed
 
 import java.util.Optional
+import java.util.{ Set => JSet }
 
 import org.apache.pekko
 import pekko.annotation.ApiMayChange
@@ -21,6 +22,31 @@ import pekko.persistence.query.Offset
 import pekko.util.HashCode
 
 object EventEnvelope {
+
+  def apply[Event](
+      offset: Offset,
+      persistenceId: String,
+      sequenceNr: Long,
+      event: Event,
+      timestamp: Long,
+      entityType: String,
+      slice: Int,
+      filtered: Boolean,
+      source: String,
+      tags: Set[String]): EventEnvelope[Event] =
+    new EventEnvelope(
+      offset,
+      persistenceId,
+      sequenceNr,
+      Option(event),
+      timestamp,
+      None,
+      entityType,
+      slice,
+      filtered,
+      source,
+      tags)
+
   def apply[Event](
       offset: Offset,
       persistenceId: String,
@@ -53,6 +79,21 @@ object EventEnvelope {
       filtered,
       source)
 
+  def create[Event](
+      offset: Offset,
+      persistenceId: String,
+      sequenceNr: Long,
+      event: Event,
+      timestamp: Long,
+      entityType: String,
+      slice: Int,
+      filtered: Boolean,
+      source: String,
+      tags: JSet[String]): EventEnvelope[Event] = {
+    import scala.jdk.CollectionConverters._
+    apply(offset, persistenceId, sequenceNr, event, timestamp, entityType, 
slice, filtered, source, tags.asScala.toSet)
+  }
+
   def create[Event](
       offset: Offset,
       persistenceId: String,
@@ -104,7 +145,32 @@ final class EventEnvelope[Event](
     val entityType: String,
     val slice: Int,
     val filtered: Boolean,
-    val source: String) {
+    val source: String,
+    val tags: Set[String]) {
+
+  def this(
+      offset: Offset,
+      persistenceId: String,
+      sequenceNr: Long,
+      eventOption: Option[Event],
+      timestamp: Long,
+      eventMetadata: Option[Any],
+      entityType: String,
+      slice: Int,
+      filtered: Boolean,
+      source: String) =
+    this(
+      offset,
+      persistenceId,
+      sequenceNr,
+      eventOption,
+      timestamp,
+      eventMetadata,
+      entityType,
+      slice,
+      filtered,
+      source,
+      tags = Set.empty)
 
   def this(
       offset: Offset,
@@ -152,6 +218,14 @@ final class EventEnvelope[Event](
     eventMetadata.toJava.asInstanceOf[Optional[AnyRef]]
   }
 
+  /**
+   * Java API:
+   */
+  def getTags(): JSet[String] = {
+    import scala.jdk.CollectionConverters._
+    tags.asJava
+  }
+
   override def hashCode(): Int = {
     var result = HashCode.SEED
     result = HashCode.hash(result, offset)
@@ -164,7 +238,8 @@ final class EventEnvelope[Event](
     case other: EventEnvelope[_] =>
       offset == other.offset && persistenceId == other.persistenceId && 
sequenceNr == other.sequenceNr &&
       eventOption == other.eventOption && timestamp == other.timestamp && 
eventMetadata == other.eventMetadata &&
-      entityType == other.entityType && slice == other.slice && filtered == 
other.filtered
+      entityType == other.entityType && slice == other.slice && filtered == 
other.filtered &&
+      tags == other.tags
     case _ => false
   }
 
@@ -177,6 +252,7 @@ final class EventEnvelope[Event](
       case Some(meta) => meta.getClass.getName
       case None       => ""
     }
-    
s"EventEnvelope($offset,$persistenceId,$sequenceNr,$eventStr,$timestamp,$metaStr,$entityType,$slice,$filtered,$source)"
+    
s"EventEnvelope($offset,$persistenceId,$sequenceNr,$eventStr,$timestamp,$metaStr,$entityType,$slice,$filtered,$source,${tags
+        .mkString("[", ", ", "]")})"
   }
 }
diff --git 
a/persistence-query/src/main/scala/org/apache/pekko/persistence/query/typed/javadsl/CurrentEventsByPersistenceIdTypedQuery.scala
 
b/persistence-query/src/main/scala/org/apache/pekko/persistence/query/typed/javadsl/CurrentEventsByPersistenceIdTypedQuery.scala
new file mode 100644
index 0000000000..75e57df815
--- /dev/null
+++ 
b/persistence-query/src/main/scala/org/apache/pekko/persistence/query/typed/javadsl/CurrentEventsByPersistenceIdTypedQuery.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2021-2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.query.typed.javadsl
+
+import org.apache.pekko
+import pekko.NotUsed
+import pekko.annotation.ApiMayChange
+import pekko.persistence.query.javadsl.ReadJournal
+import pekko.persistence.query.typed.EventEnvelope
+import pekko.stream.javadsl.Source
+
+/**
+ * A plugin may optionally support this query by implementing this trait.
+ *
+ * API May Change
+ */
+@ApiMayChange
+trait CurrentEventsByPersistenceIdTypedQuery extends ReadJournal {
+
+  /**
+   * Same as [[EventsByPersistenceIdTypedQuery.eventsByPersistenceIdTyped]] 
but the stream is completed
+   * immediately when it reaches the end of the "current" events. Events that 
are stored after the
+   * query is started are not included in the stream.
+   */
+  def currentEventsByPersistenceIdTyped[Event](
+      persistenceId: String,
+      fromSequenceNr: Long,
+      toSequenceNr: Long): Source[EventEnvelope[Event], NotUsed]
+
+}
diff --git 
a/persistence-query/src/main/scala/org/apache/pekko/persistence/query/typed/javadsl/EventsByPersistenceIdTypedQuery.scala
 
b/persistence-query/src/main/scala/org/apache/pekko/persistence/query/typed/javadsl/EventsByPersistenceIdTypedQuery.scala
new file mode 100644
index 0000000000..6e925e92d7
--- /dev/null
+++ 
b/persistence-query/src/main/scala/org/apache/pekko/persistence/query/typed/javadsl/EventsByPersistenceIdTypedQuery.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2021-2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.query.typed.javadsl
+
+import org.apache.pekko
+import pekko.NotUsed
+import pekko.annotation.ApiMayChange
+import pekko.persistence.query.javadsl.ReadJournal
+import pekko.persistence.query.typed.EventEnvelope
+import pekko.stream.javadsl.Source
+
+/**
+ * A plugin may optionally support this query by implementing this trait.
+ *
+ * API May Change
+ */
+@ApiMayChange
+trait EventsByPersistenceIdTypedQuery extends ReadJournal {
+
+  /**
+   * Query events for a specific `PersistenceId`.
+   *
+   * Events are emitted in the order they were stored. The stream also emits 
events that are persisted
+   * after the query is started. The stream is not completed when it reaches 
the end of the currently stored
+   * events, but it continues to push new events when new events are 
persisted. Corresponding query that is
+   * completed when it reaches the end of the currently stored events is 
provided by
+   * 
[[CurrentEventsByPersistenceIdTypedQuery.currentEventsByPersistenceIdTyped]].
+   *
+   * The `fromSequenceNr` and `toSequenceNr` can be used to limit what 
sequence numbers the returned stream
+   * will contain. Both sides are inclusive. `0` and `Long.MaxValue` are used 
to signify no lower/upper bound.
+   */
+  def eventsByPersistenceIdTyped[Event](
+      persistenceId: String,
+      fromSequenceNr: Long,
+      toSequenceNr: Long): Source[EventEnvelope[Event], NotUsed]
+
+}
diff --git 
a/persistence-query/src/main/scala/org/apache/pekko/persistence/query/typed/scaladsl/CurrentEventsByPersistenceIdTypedQuery.scala
 
b/persistence-query/src/main/scala/org/apache/pekko/persistence/query/typed/scaladsl/CurrentEventsByPersistenceIdTypedQuery.scala
new file mode 100644
index 0000000000..f07776d386
--- /dev/null
+++ 
b/persistence-query/src/main/scala/org/apache/pekko/persistence/query/typed/scaladsl/CurrentEventsByPersistenceIdTypedQuery.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2021-2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.query.typed.scaladsl
+
+import org.apache.pekko
+import pekko.NotUsed
+import pekko.annotation.ApiMayChange
+import pekko.persistence.query.scaladsl.ReadJournal
+import pekko.persistence.query.typed.EventEnvelope
+import pekko.stream.scaladsl.Source
+
+/**
+ * A plugin may optionally support this query by implementing this trait.
+ *
+ * API May Change
+ */
+@ApiMayChange
+trait CurrentEventsByPersistenceIdTypedQuery extends ReadJournal {
+
+  /**
+   * Same as [[EventsByPersistenceIdTypedQuery.eventsByPersistenceIdTyped]] 
but the stream is completed
+   * immediately when it reaches the end of the "current" events. Events that 
are stored after the
+   * query is started are not included in the stream.
+   */
+  def currentEventsByPersistenceIdTyped[Event](
+      persistenceId: String,
+      fromSequenceNr: Long,
+      toSequenceNr: Long): Source[EventEnvelope[Event], NotUsed]
+
+}
diff --git 
a/persistence-query/src/main/scala/org/apache/pekko/persistence/query/typed/scaladsl/EventsByPersistenceIdTypedQuery.scala
 
b/persistence-query/src/main/scala/org/apache/pekko/persistence/query/typed/scaladsl/EventsByPersistenceIdTypedQuery.scala
new file mode 100644
index 0000000000..9e05b1751f
--- /dev/null
+++ 
b/persistence-query/src/main/scala/org/apache/pekko/persistence/query/typed/scaladsl/EventsByPersistenceIdTypedQuery.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2021-2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.query.typed.scaladsl
+
+import org.apache.pekko
+import pekko.NotUsed
+import pekko.annotation.ApiMayChange
+import pekko.persistence.query.scaladsl.ReadJournal
+import pekko.persistence.query.typed.EventEnvelope
+import pekko.stream.scaladsl.Source
+
+/**
+ * A plugin may optionally support this query by implementing this trait.
+ *
+ * API May Change
+ */
+@ApiMayChange
+trait EventsByPersistenceIdTypedQuery extends ReadJournal {
+
+  /**
+   * Query events for a specific `PersistenceId`.
+   *
+   * Events are emitted in the order they were stored. The stream also emits 
events that are persisted
+   * after the query is started. The stream is not completed when it reaches 
the end of the currently stored
+   * events, but it continues to push new events when new events are 
persisted. Corresponding query that is
+   * completed when it reaches the end of the currently stored events is 
provided by
+   * 
[[CurrentEventsByPersistenceIdTypedQuery.currentEventsByPersistenceIdTyped]].
+   *
+   * The `fromSequenceNr` and `toSequenceNr` can be used to limit what 
sequence numbers the returned stream
+   * will contain. Both sides are inclusive. `0` and `Long.MaxValue` are used 
to signify no lower/upper bound.
+   */
+  def eventsByPersistenceIdTyped[Event](
+      persistenceId: String,
+      fromSequenceNr: Long,
+      toSequenceNr: Long): Source[EventEnvelope[Event], NotUsed]
+
+}
diff --git 
a/persistence-query/src/test/scala/org/apache/pekko/persistence/query/internal/QuerySerializerSpec.scala
 
b/persistence-query/src/test/scala/org/apache/pekko/persistence/query/internal/QuerySerializerSpec.scala
index 1d9268d226..8d0afba57f 100644
--- 
a/persistence-query/src/test/scala/org/apache/pekko/persistence/query/internal/QuerySerializerSpec.scala
+++ 
b/persistence-query/src/test/scala/org/apache/pekko/persistence/query/internal/QuerySerializerSpec.scala
@@ -91,6 +91,22 @@ class QuerySerializerSpec extends PekkoSpec {
           source = "backtracking"))
     }
 
+    "serialize EventEnvelope with source and tags" in {
+      verifySerialization(
+        new EventEnvelope(
+          Sequence(1L),
+          "TestEntity|id1",
+          3L,
+          Some("event1"),
+          System.currentTimeMillis(),
+          None,
+          "TestEntity",
+          5,
+          filtered = false,
+          source = "query",
+          tags = Set("tag1", "tag2")))
+    }
+
     "serialize Sequence Offset" in {
       verifySerialization(Sequence(0))
     }
diff --git 
a/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/internal/PersistenceProbeImpl.scala
 
b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/internal/PersistenceProbeImpl.scala
index dbe95098b1..d7ddd69043 100644
--- 
a/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/internal/PersistenceProbeImpl.scala
+++ 
b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/internal/PersistenceProbeImpl.scala
@@ -147,7 +147,7 @@ private[pekko] object PersistenceProbeImpl {
           case Persist(event) =>
             sequenceNr += 1
             state = eventHandler(state, event)
-            onEvent(event, sequenceNr, tagger(event))
+            onEvent(event, sequenceNr, tagger(state, event))
             shouldSnapshot = shouldSnapshot || snapshotRequested(event)
             sideEffect(sideEffects)
 
@@ -162,7 +162,7 @@ private[pekko] object PersistenceProbeImpl {
             eventsWithSeqNrs.foreach {
               case (event, seqNr) =>
                 // technically doesn't persist them atomically, but in tests 
that shouldn't matter
-                onEvent(event, seqNr, tagger(event))
+                onEvent(event, seqNr, tagger(state, event))
                 shouldSnapshot = shouldSnapshot || snapshotRequested(event)
             }
 
diff --git 
a/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/query/internal/TypedEventsByPersistenceIdStage.scala
 
b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/query/internal/TypedEventsByPersistenceIdStage.scala
new file mode 100644
index 0000000000..70698a9f7f
--- /dev/null
+++ 
b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/query/internal/TypedEventsByPersistenceIdStage.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2020-2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.testkit.query.internal
+
+import java.time.Instant
+import java.time.temporal.ChronoUnit
+
+import org.apache.pekko
+import pekko.actor.ActorRef
+import pekko.annotation.InternalApi
+import pekko.persistence.Persistence
+import pekko.persistence.journal.Tagged
+import pekko.persistence.query.TimestampOffset
+import pekko.persistence.query.typed.EventEnvelope
+import pekko.persistence.testkit.{ EventStorage, PersistenceTestKitPlugin }
+import pekko.persistence.typed.PersistenceId
+import pekko.stream.{ Attributes, Outlet, SourceShape }
+import pekko.stream.stage.{ GraphStage, GraphStageLogic, 
GraphStageLogicWithLogging, OutHandler }
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+final private[pekko] class TypedEventsByPersistenceIdStage[Event](
+    persistenceId: String,
+    fromSequenceNr: Long,
+    toSequenceNr: Long,
+    storage: EventStorage,
+    persistence: Persistence)
+    extends GraphStage[SourceShape[EventEnvelope[Event]]] {
+  val out: Outlet[EventEnvelope[Event]] = 
Outlet("TypedEventsByPersistenceIdSource")
+  override def shape: SourceShape[EventEnvelope[Event]] = SourceShape(out)
+
+  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = 
{
+    new GraphStageLogicWithLogging(shape) with OutHandler {
+      private var currentSequenceNr = math.max(fromSequenceNr, 1)
+      private var stageActorRef: ActorRef = null
+      override def preStart(): Unit = {
+        stageActorRef = getStageActor(receiveNotifications).ref
+        materializer.system.eventStream.subscribe(stageActorRef, 
classOf[PersistenceTestKitPlugin.Write])
+      }
+
+      private def receiveNotifications(in: (ActorRef, Any)): Unit = {
+        val (_, msg) = in
+        msg match {
+          case PersistenceTestKitPlugin.Write(pid, toSequenceNr) if pid == 
persistenceId =>
+            if (toSequenceNr >= currentSequenceNr) {
+              tryPush()
+            }
+          case _ =>
+        }
+      }
+
+      private def tryPush(): Unit = {
+        if (isAvailable(out)) {
+          val event = storage.tryRead(persistenceId, currentSequenceNr, 
currentSequenceNr, 1)
+          log.debug("tryPush available. Query for {} {} result {}", 
currentSequenceNr, currentSequenceNr, event)
+          event.headOption match {
+            case Some(pr) =>
+              val timestamp = Instant.ofEpochMilli(pr.timestamp)
+              val readTimestamp = Instant.now().truncatedTo(ChronoUnit.MICROS)
+              val slice = persistence.sliceForPersistenceId(persistenceId)
+              val entityType = PersistenceId.extractEntityType(persistenceId)
+              val tags = pr.payload match {
+                case Tagged(_, t) => t
+                case _            => Set.empty[String]
+              }
+              val payload = pr.payload match {
+                case Tagged(p, _) => p
+                case p            => p
+              }
+              push(out,
+                EventEnvelope(
+                  TimestampOffset(timestamp, readTimestamp, 
Map(pr.persistenceId -> pr.sequenceNr)),
+                  pr.persistenceId,
+                  pr.sequenceNr,
+                  payload.asInstanceOf[Event],
+                  pr.timestamp,
+                  entityType,
+                  slice,
+                  filtered = false,
+                  source = "",
+                  tags = tags))
+              if (currentSequenceNr == toSequenceNr) {
+                completeStage()
+              } else {
+                currentSequenceNr += 1
+              }
+            case None =>
+          }
+        } else {
+          log.debug("tryPush, no demand")
+        }
+      }
+
+      override def onPull(): Unit = {
+        tryPush()
+      }
+
+      setHandler(out, this)
+    }
+
+  }
+
+}
diff --git 
a/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/query/javadsl/PersistenceTestKitReadJournal.scala
 
b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/query/javadsl/PersistenceTestKitReadJournal.scala
index 49f63684bb..15e11df159 100644
--- 
a/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/query/javadsl/PersistenceTestKitReadJournal.scala
+++ 
b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/query/javadsl/PersistenceTestKitReadJournal.scala
@@ -26,7 +26,12 @@ import pekko.persistence.query.javadsl.{
   ReadJournal
 }
 import pekko.persistence.query.typed
-import pekko.persistence.query.typed.javadsl.{ CurrentEventsBySliceQuery, 
EventsBySliceQuery }
+import pekko.persistence.query.typed.javadsl.{
+  CurrentEventsByPersistenceIdTypedQuery,
+  CurrentEventsBySliceQuery,
+  EventsByPersistenceIdTypedQuery,
+  EventsBySliceQuery
+}
 import pekko.persistence.testkit.query.scaladsl
 import pekko.stream.javadsl.Source
 
@@ -41,7 +46,9 @@ final class PersistenceTestKitReadJournal(delegate: 
scaladsl.PersistenceTestKitR
     with CurrentEventsByTagQuery
     with CurrentEventsBySliceQuery
     with EventsByTagQuery
-    with EventsBySliceQuery {
+    with EventsBySliceQuery
+    with EventsByPersistenceIdTypedQuery
+    with CurrentEventsByPersistenceIdTypedQuery {
 
   override def eventsByPersistenceId(
       persistenceId: String,
@@ -55,6 +62,18 @@ final class PersistenceTestKitReadJournal(delegate: 
scaladsl.PersistenceTestKitR
       toSequenceNr: Long): Source[EventEnvelope, NotUsed] =
     delegate.currentEventsByPersistenceId(persistenceId, fromSequenceNr, 
toSequenceNr).asJava
 
+  override def eventsByPersistenceIdTyped[Event](
+      persistenceId: String,
+      fromSequenceNr: Long,
+      toSequenceNr: Long): Source[typed.EventEnvelope[Event], NotUsed] =
+    delegate.eventsByPersistenceIdTyped(persistenceId, fromSequenceNr, 
toSequenceNr).asJava
+
+  override def currentEventsByPersistenceIdTyped[Event](
+      persistenceId: String,
+      fromSequenceNr: Long,
+      toSequenceNr: Long): Source[typed.EventEnvelope[Event], NotUsed] =
+    delegate.currentEventsByPersistenceIdTyped(persistenceId, fromSequenceNr, 
toSequenceNr).asJava
+
   override def currentEventsByTag(tag: String, offset: Offset): 
Source[EventEnvelope, NotUsed] =
     delegate.currentEventsByTag(tag, offset).asJava
 
diff --git 
a/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/query/scaladsl/PersistenceTestKitReadJournal.scala
 
b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/query/scaladsl/PersistenceTestKitReadJournal.scala
index 73c7945160..4d7d6e8e70 100644
--- 
a/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/query/scaladsl/PersistenceTestKitReadJournal.scala
+++ 
b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/query/scaladsl/PersistenceTestKitReadJournal.scala
@@ -8,10 +8,12 @@
  */
 
 /*
- * Copyright (C) 2020-2022 Lightbend Inc. <https://www.lightbend.com>
+ * Copyright (C) 2020-2023 Lightbend Inc. <https://www.lightbend.com>
  */
 
 package org.apache.pekko.persistence.testkit.query.scaladsl
+import java.time.Instant
+import java.time.temporal.ChronoUnit
 import scala.annotation.nowarn
 import scala.collection.immutable
 
@@ -23,6 +25,7 @@ import pekko.persistence.journal.Tagged
 import pekko.persistence.query.{ EventEnvelope, Sequence }
 import pekko.persistence.query.NoOffset
 import pekko.persistence.query.Offset
+import pekko.persistence.query.TimestampOffset
 import pekko.persistence.query.scaladsl.{
   CurrentEventsByPersistenceIdQuery,
   CurrentEventsByTagQuery,
@@ -32,13 +35,16 @@ import pekko.persistence.query.scaladsl.{
 }
 import pekko.persistence.query.scaladsl.EventsByTagQuery
 import pekko.persistence.query.typed
+import 
pekko.persistence.query.typed.scaladsl.CurrentEventsByPersistenceIdTypedQuery
 import pekko.persistence.query.typed.scaladsl.CurrentEventsBySliceQuery
+import pekko.persistence.query.typed.scaladsl.EventsByPersistenceIdTypedQuery
 import pekko.persistence.query.typed.scaladsl.EventsBySliceQuery
 import pekko.persistence.testkit.EventStorage
 import pekko.persistence.testkit.internal.InMemStorageExtension
 import pekko.persistence.testkit.query.internal.EventsByPersistenceIdStage
 import pekko.persistence.testkit.query.internal.EventsBySliceStage
 import pekko.persistence.testkit.query.internal.EventsByTagStage
+import pekko.persistence.testkit.query.internal.TypedEventsByPersistenceIdStage
 import pekko.persistence.typed.PersistenceId
 import pekko.stream.scaladsl.Source
 
@@ -59,7 +65,9 @@ final class PersistenceTestKitReadJournal(system: 
ExtendedActorSystem, @nowarn("
     with CurrentEventsBySliceQuery
     with PagedPersistenceIdsQuery
     with EventsByTagQuery
-    with EventsBySliceQuery {
+    with EventsBySliceQuery
+    with EventsByPersistenceIdTypedQuery
+    with CurrentEventsByPersistenceIdTypedQuery {
 
   private val log = LoggerFactory.getLogger(getClass)
 
@@ -77,6 +85,17 @@ final class PersistenceTestKitReadJournal(system: 
ExtendedActorSystem, @nowarn("
     case payload            => payload
   }
 
+  private def tagsFor(payload: Any): Set[String] = payload match {
+    case Tagged(_, tags) => tags
+    case _               => Set.empty
+  }
+
+  private def timestampOffsetFor(pr: pekko.persistence.PersistentRepr): 
TimestampOffset = {
+    val timestamp = Instant.ofEpochMilli(pr.timestamp)
+    val readTimestamp = Instant.now().truncatedTo(ChronoUnit.MICROS)
+    TimestampOffset(timestamp, readTimestamp, Map(pr.persistenceId -> 
pr.sequenceNr))
+  }
+
   override def eventsByPersistenceId(
       persistenceId: String,
       fromSequenceNr: Long = 0,
@@ -99,6 +118,35 @@ final class PersistenceTestKitReadJournal(system: 
ExtendedActorSystem, @nowarn("
     }
   }
 
+  override def eventsByPersistenceIdTyped[Event](
+      persistenceId: String,
+      fromSequenceNr: Long = 0,
+      toSequenceNr: Long = Long.MaxValue): Source[typed.EventEnvelope[Event], 
NotUsed] = {
+    Source.fromGraph(
+      new TypedEventsByPersistenceIdStage[Event](persistenceId, 
fromSequenceNr, toSequenceNr, storage, persistence))
+  }
+
+  override def currentEventsByPersistenceIdTyped[Event](
+      persistenceId: String,
+      fromSequenceNr: Long = 0,
+      toSequenceNr: Long = Long.MaxValue): Source[typed.EventEnvelope[Event], 
NotUsed] = {
+    val slice = persistence.sliceForPersistenceId(persistenceId)
+    val entityType = PersistenceId.extractEntityType(persistenceId)
+    Source(storage.tryRead(persistenceId, fromSequenceNr, toSequenceNr, 
Long.MaxValue)).map { pr =>
+      typed.EventEnvelope(
+        timestampOffsetFor(pr),
+        persistenceId,
+        pr.sequenceNr,
+        unwrapTaggedPayload(pr.payload).asInstanceOf[Event],
+        pr.timestamp,
+        entityType,
+        slice,
+        filtered = false,
+        source = "",
+        tags = tagsFor(pr.payload))
+    }
+  }
+
   override def currentEventsByTag(tag: String, offset: Offset = NoOffset): 
Source[EventEnvelope, NotUsed] = {
     offset match {
       case NoOffset =>
@@ -134,15 +182,17 @@ final class PersistenceTestKitReadJournal(system: 
ExtendedActorSystem, @nowarn("
       })
     Source(prs).map { pr =>
       val slice = persistence.sliceForPersistenceId(pr.persistenceId)
-      new typed.EventEnvelope[Event](
-        Sequence(pr.sequenceNr),
+      typed.EventEnvelope(
+        timestampOffsetFor(pr),
         pr.persistenceId,
         pr.sequenceNr,
-        Some(pr.payload.asInstanceOf[Event]),
+        unwrapTaggedPayload(pr.payload).asInstanceOf[Event],
         pr.timestamp,
-        pr.metadata,
         entityType,
-        slice)
+        slice,
+        filtered = false,
+        source = "",
+        tags = tagsFor(pr.payload))
     }
   }
 
diff --git 
a/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/query/CurrentEventsBySlicesSpec.scala
 
b/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/query/CurrentEventsBySlicesSpec.scala
index ad4482667f..82365c3b15 100644
--- 
a/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/query/CurrentEventsBySlicesSpec.scala
+++ 
b/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/query/CurrentEventsBySlicesSpec.scala
@@ -21,6 +21,7 @@ import pekko.actor.typed.ActorRef
 import pekko.persistence.Persistence
 import pekko.persistence.query.NoOffset
 import pekko.persistence.query.PersistenceQuery
+import pekko.persistence.query.TimestampOffset
 import pekko.persistence.testkit.query.EventsByPersistenceIdSpec.Command
 import pekko.persistence.testkit.query.EventsByPersistenceIdSpec.testBehavior
 import pekko.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
@@ -71,6 +72,27 @@ class CurrentEventsBySlicesSpec
         .futureValue
         .map(_.event) should ===(Seq("evt-1", "evt-2", "evt-3", "evt-4", 
"evt-5"))
     }
+
+    "include tags in events by slices" in {
+      val probe = createTestProbe[Done]()
+      val ref1 = spawn(testBehavior("TagTest|pid-1"))
+      ref1 ! Command("tag-me-evt-1", probe.ref)
+      ref1 ! Command("evt-2", probe.ref)
+      probe.receiveMessages(2)
+      val ref2 = spawn(testBehavior("TagTest|pid-2"))
+      ref2 ! Command("evt-3", probe.ref)
+      ref2 ! Command("tag-me-evt-4", probe.ref)
+      probe.receiveMessages(2)
+
+      val result = queries
+        .currentEventsBySlices[String]("TagTest", 0, 
Persistence(system).numberOfSlices - 1, NoOffset)
+        .runWith(Sink.seq)
+        .futureValue
+
+      result.head.offset shouldBe a[TimestampOffset]
+      result.map(e => (e.event, e.tags)) should ===(
+        Seq(("tag-me-evt-1", Set("tag")), ("evt-2", Set.empty), ("evt-3", 
Set.empty), ("tag-me-evt-4", Set("tag"))))
+    }
   }
 
 }
diff --git 
a/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/query/EventsByPersistenceIdSpec.scala
 
b/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/query/EventsByPersistenceIdSpec.scala
index 2865c65385..90f99321a3 100644
--- 
a/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/query/EventsByPersistenceIdSpec.scala
+++ 
b/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/query/EventsByPersistenceIdSpec.scala
@@ -20,6 +20,7 @@ import pekko.Done
 import pekko.actor.testkit.typed.scaladsl.{ LogCapturing, 
ScalaTestWithActorTestKit }
 import pekko.actor.typed.ActorRef
 import pekko.persistence.query.{ EventEnvelope, PersistenceQuery }
+import pekko.persistence.query.Sequence
 import pekko.persistence.testkit.PersistenceTestKitPlugin
 import pekko.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
 import pekko.persistence.typed.PersistenceId
@@ -49,7 +50,7 @@ object EventsByPersistenceIdSpec {
         Effect.persist(command.evt).thenRun { _ =>
           command.ack ! Done
         },
-      (state, _) => state)
+      (state, _) => state).withTagger(evt => if (evt.startsWith("tag-me-")) 
Set("tag") else Set.empty)
   }
 
 }
@@ -126,7 +127,9 @@ class EventsByPersistenceIdSpec
       val probe = src.runWith(TestSink[EventEnvelope]())
 
       probe.request(5)
-      probe.expectNext().timestamp should be > 0L
+      val envelope = probe.expectNext()
+      envelope.timestamp should be > 0L
+      envelope.offset shouldBe a[Sequence]
       probe.expectNext().timestamp should be > 0L
       probe.cancel()
     }
@@ -146,4 +149,20 @@ class EventsByPersistenceIdSpec
       probe.cancel()
     }
   }
+
+  "Persistent test kit query currentEventsByPersistenceId" must {
+    "include timestamp in EventEnvelope" in {
+      setup("n")
+
+      val src = queries.currentEventsByPersistenceId("n", 0L, Long.MaxValue)
+      val probe = src.runWith(TestSink[EventEnvelope]())
+
+      probe.request(5)
+      val envelope = probe.expectNext()
+      envelope.timestamp should be > 0L
+      envelope.offset shouldBe a[Sequence]
+      probe.expectNext().timestamp should be > 0L
+      probe.cancel()
+    }
+  }
 }
diff --git 
a/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/query/EventsByPersistenceIdTypedSpec.scala
 
b/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/query/EventsByPersistenceIdTypedSpec.scala
new file mode 100644
index 0000000000..9a1b56f8ba
--- /dev/null
+++ 
b/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/query/EventsByPersistenceIdTypedSpec.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2020-2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.testkit.query
+
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.testkit.typed.scaladsl.LogCapturing
+import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
+import pekko.actor.typed.ActorRef
+import pekko.persistence.Persistence
+import pekko.persistence.query.PersistenceQuery
+import pekko.persistence.query.TimestampOffset
+import pekko.persistence.query.typed.EventEnvelope
+import pekko.persistence.testkit.PersistenceTestKitPlugin
+import pekko.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
+import pekko.persistence.typed.PersistenceId
+import pekko.persistence.typed.scaladsl.Effect
+import pekko.persistence.typed.scaladsl.EventSourcedBehavior
+import pekko.stream.scaladsl.Sink
+import pekko.stream.testkit.scaladsl.TestSink
+
+import org.scalatest.wordspec.AnyWordSpecLike
+
+import com.typesafe.config.ConfigFactory
+
+object EventsByPersistenceIdTypedSpec {
+  val config = PersistenceTestKitPlugin.config.withFallback(
+    ConfigFactory.parseString("""
+    pekko.loglevel = DEBUG
+    pekko.loggers = ["org.apache.pekko.testkit.SilenceAllTestEventListener"]
+    pekko.persistence.testkit.events.serialize = off
+      """))
+
+  case class Command(evt: String, ack: ActorRef[Done])
+  case class State()
+
+  def testBehavior(persistenceId: String) = {
+    EventSourcedBehavior[Command, String, State](
+      PersistenceId.ofUniqueId(persistenceId),
+      State(),
+      (_, command) =>
+        Effect.persist(command.evt).thenRun { _ =>
+          command.ack ! Done
+        },
+      (state, _) => state).withTagger(evt => if (evt.startsWith("tag-me-")) 
Set("tag") else Set.empty)
+  }
+
+}
+
+class EventsByPersistenceIdTypedSpec
+    extends ScalaTestWithActorTestKit(EventsByPersistenceIdTypedSpec.config)
+    with LogCapturing
+    with AnyWordSpecLike {
+  import EventsByPersistenceIdTypedSpec._
+
+  implicit val classic: pekko.actor.ActorSystem = system.classicSystem
+
+  val queries =
+    
PersistenceQuery(system).readJournalFor[PersistenceTestKitReadJournal](PersistenceTestKitReadJournal.Identifier)
+
+  def setup(persistenceId: String): ActorRef[Command] = {
+    val probe = createTestProbe[Done]()
+    val ref = setupEmpty(persistenceId)
+    ref ! Command(s"$persistenceId-1", probe.ref)
+    ref ! Command(s"$persistenceId-2", probe.ref)
+    ref ! Command(s"$persistenceId-3", probe.ref)
+    probe.expectMessage(Done)
+    probe.expectMessage(Done)
+    probe.expectMessage(Done)
+    ref
+  }
+
+  def setupEmpty(persistenceId: String): ActorRef[Command] = {
+    spawn(testBehavior(persistenceId))
+  }
+
+  "Persistent test kit live query eventsByPersistenceIdTyped" must {
+    "find new events" in {
+      val ackProbe = createTestProbe[Done]()
+      val ref = setup("d")
+      val src = queries.eventsByPersistenceIdTyped[String]("d", 0L, 
Long.MaxValue)
+      val probe = src.runWith(TestSink[EventEnvelope[String]]())
+      probe.request(5)
+      probe.expectNextN(3)
+
+      ref ! Command("tag-me-d-4", ackProbe.ref)
+      ackProbe.expectMessage(Done)
+
+      val envelope = probe.expectNext()
+      envelope.offset shouldBe a[TimestampOffset]
+      envelope.event should ===("tag-me-d-4")
+      envelope.tags should ===(Set("tag"))
+      envelope.filtered should ===(false)
+      envelope.source should ===("")
+      envelope.slice should ===(Persistence(system).sliceForPersistenceId("d"))
+
+      val currentResult =
+        queries.currentEventsByPersistenceIdTyped[String]("d", 0L, 
Long.MaxValue).runWith(Sink.seq).futureValue
+      currentResult should have size 4
+      currentResult.last should ===(envelope)
+    }
+  }
+}
diff --git 
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala
 
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala
index c19a16fa75..6e3ad66614 100644
--- 
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala
+++ 
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala
@@ -553,6 +553,28 @@ class EventSourcedBehaviorSpec
       events shouldEqual List(EventEnvelope(Sequence(1), pid.id, 1, 
Incremented(1), 0L))
     }
 
+    "tag events based on state" in {
+      val pid = nextPid()
+      val c = spawn(
+        Behaviors.setup[Command](ctx =>
+          counter(ctx, pid).withTaggerForState((state, _) =>
+            if (state.value <= 1) Set.empty
+            else Set("higher-than-one"))))
+      val replyProbe = TestProbe[State]()
+
+      c ! Increment
+      c ! GetValue(replyProbe.ref)
+      replyProbe.expectMessage(State(1, Vector(0)))
+
+      c ! Increment
+      c ! GetValue(replyProbe.ref)
+      replyProbe.expectMessage(State(2, Vector(0, 1)))
+
+      val events = queries.currentEventsByTag("higher-than-one", 
Offset.noOffset).runWith(Sink.seq).futureValue
+      events should have size 1
+      events.head shouldEqual EventEnvelope(Sequence(2), pid.id, 2, 
Incremented(1), 0L)
+    }
+
     "handle scheduled message arriving before recovery completed " in {
       val c = spawn(Behaviors.withTimers[Command] { timers =>
         timers.startSingleTimer(Increment, 1.millis)
diff --git 
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorWatchSpec.scala
 
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorWatchSpec.scala
index ded56b0bf5..5556f79f12 100644
--- 
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorWatchSpec.scala
+++ 
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorWatchSpec.scala
@@ -76,7 +76,7 @@ class EventSourcedBehaviorWatchSpec
       eventHandler = (state, evt) => state + evt,
       WriterIdentity.newIdentity(),
       pf,
-      _ => Set.empty[String],
+      (_, _) => Set.empty[String],
       NoOpEventAdapter.instance[String],
       NoOpSnapshotAdapter.instance[String],
       snapshotWhen = ConstantFun.scalaAnyThreeToFalse,
diff --git 
a/persistence-typed/src/main/mima-filters/2.0.x.backwards.excludes/tags-for-state.excludes
 
b/persistence-typed/src/main/mima-filters/2.0.x.backwards.excludes/tags-for-state.excludes
new file mode 100644
index 0000000000..0537827a22
--- /dev/null
+++ 
b/persistence-typed/src/main/mima-filters/2.0.x.backwards.excludes/tags-for-state.excludes
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# internal
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.persistence.typed.internal.Running#HandlingCommands.adaptEvent")
+# Bin incompatible, but no way around it because Java DSL is based on 
inheritance, hopefully nobody made
+# a method like this
+ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.persistence.typed.scaladsl.EventSourcedBehavior.withTaggerForState")
diff --git 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/BehaviorSetup.scala
 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/BehaviorSetup.scala
index 38b94166a0..f8670c5bc1 100644
--- 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/BehaviorSetup.scala
+++ 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/BehaviorSetup.scala
@@ -61,7 +61,7 @@ private[pekko] final class BehaviorSetup[C, E, S](
     val eventHandler: EventSourcedBehavior.EventHandler[S, E],
     val writerIdentity: EventSourcedBehaviorImpl.WriterIdentity,
     private val signalHandler: PartialFunction[(S, Signal), Unit],
-    val tagger: E => Set[String],
+    val tagger: (S, E) => Set[String],
     val eventAdapter: EventAdapter[E, Any],
     val snapshotAdapter: SnapshotAdapter[S],
     val snapshotWhen: (S, E, Long) => Boolean,
diff --git 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedBehaviorImpl.scala
 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedBehaviorImpl.scala
index 75ebbb1b43..060e10f4b2 100644
--- 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedBehaviorImpl.scala
+++ 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedBehaviorImpl.scala
@@ -111,7 +111,7 @@ private[pekko] final case class 
EventSourcedBehaviorImpl[Command, Event, State](
     snapshotPluginId: Option[String] = None,
     journalPluginConfig: Option[Config] = None,
     snapshotPluginConfig: Option[Config] = None,
-    tagger: Event => Set[String] = (_: Event) => Set.empty[String],
+    tagger: (State, Event) => Set[String] = (_: State, _: Event) => 
Set.empty[String],
     eventAdapter: EventAdapter[Event, Any] = NoOpEventAdapter.instance[Event],
     snapshotAdapter: SnapshotAdapter[State] = 
NoOpSnapshotAdapter.instance[State],
     snapshotWhen: (State, Event, Long) => Boolean = 
ConstantFun.scalaAnyThreeToFalse,
@@ -285,6 +285,9 @@ private[pekko] final case class 
EventSourcedBehaviorImpl[Command, Event, State](
     copy(retention = criteria)
 
   override def withTagger(tagger: Event => Set[String]): 
EventSourcedBehavior[Command, Event, State] =
+    copy(tagger = (_, event) => tagger(event))
+
+  override def withTaggerForState(tagger: (State, Event) => Set[String]): 
EventSourcedBehavior[Command, Event, State] =
     copy(tagger = tagger)
 
   override def eventAdapter(adapter: EventAdapter[Event, _]): 
EventSourcedBehavior[Command, Event, State] =
diff --git 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala
 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala
index a43eb278b1..e2c88ee1ab 100644
--- 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala
+++ 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala
@@ -501,7 +501,7 @@ private[pekko] object Running {
       replication.setContext(recoveryRunning = false, event.originReplica, 
concurrent = isConcurrent)
 
       val stateAfterApply = state.applyEvent(setup, event.event)
-      val eventToPersist = adaptEvent(event.event)
+      val eventToPersist = adaptEvent(stateAfterApply.state, event.event)
       val eventAdapterManifest = setup.eventAdapter.manifest(event.event)
 
       replication.clearContext()
@@ -547,7 +547,7 @@ private[pekko] object Running {
         setup.replication.foreach(r => r.setContext(recoveryRunning = false, 
r.replicaId, concurrent = false))
 
         val stateAfterApply = state.applyEvent(setup, event)
-        val eventToPersist = adaptEvent(event)
+        val eventToPersist = adaptEvent(stateAfterApply.state, event)
         val eventAdapterManifest = setup.eventAdapter.manifest(event)
 
         val newState2: RunningState[S, C] = setup.replication match {
@@ -619,7 +619,6 @@ private[pekko] object Running {
             if (shouldSnapshotAfterPersist == NoSnapshot)
               shouldSnapshotAfterPersist = 
setup.shouldSnapshot(currentState.state, event, _currentSequenceNumber)
             val evtManifest = setup.eventAdapter.manifest(event)
-            val adaptedEvent = adaptEvent(event)
             val eventMetadata = metadataTemplate match {
               case Some(template) =>
                 val updatedVersion = 
currentState.version.updated(template.originReplica.id, _currentSequenceNumber)
@@ -635,6 +634,8 @@ private[pekko] object Running {
 
             currentState = currentState.applyEvent(setup, event)
 
+            val adaptedEvent = adaptEvent(currentState.state, event)
+
             eventsToPersist = EventToPersist(adaptedEvent, evtManifest, 
eventMetadata) :: eventsToPersist
           }
 
@@ -699,8 +700,8 @@ private[pekko] object Running {
       }
     }
 
-    def adaptEvent(event: E): Any = {
-      val tags = setup.tagger(event)
+    def adaptEvent(state: S, event: E): Any = {
+      val tags = setup.tagger(state, event)
       val adaptedEvent = setup.eventAdapter.toJournal(event)
       if (tags.isEmpty)
         adaptedEvent
diff --git 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/javadsl/EventSourcedBehavior.scala
 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/javadsl/EventSourcedBehavior.scala
index 742ccceb11..811ea8465e 100644
--- 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/javadsl/EventSourcedBehavior.scala
+++ 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/javadsl/EventSourcedBehavior.scala
@@ -178,10 +178,20 @@ abstract class EventSourcedBehavior[Command, Event, 
State] private[pekko] (
   def recovery: Recovery = Recovery.default
 
   /**
-   * The `tagger` function should give event tags, which will be used in 
persistence query
+   * Return tags to store for the given event, the tags can then be used in 
persistence query.
+   *
+   * If [[tagsFor(Event, State)]] is overridden this method is ignored.
    */
   def tagsFor(@nowarn("msg=never used") event: Event): java.util.Set[String] = 
Collections.emptySet()
 
+  /**
+   * Return tags to store for the given event and state, the tags can then be 
used in persistence query.
+   * The state passed to the tagger allows for toggling a tag with one event 
but keep all events after it tagged
+   * based on a property or the type of the state.
+   */
+  def tagsFor(@nowarn("msg=never used") state: State, event: Event): 
java.util.Set[String] =
+    tagsFor(event)
+
   /**
    * Transform the event in another type before giving to the journal. Can be 
used to wrap events
    * in types Journals understand but is of a different type than `Event`.
@@ -207,9 +217,9 @@ abstract class EventSourcedBehavior[Command, Event, State] 
private[pekko] (
       : scaladsl.EventSourcedBehavior[Command, Event, State] = {
     val snapshotWhen: (State, Event, Long) => Boolean = (state, event, seqNr) 
=> shouldSnapshot(state, event, seqNr)
 
-    val tagger: Event => Set[String] = { event =>
+    val tagger: (State, Event) => Set[String] = { (state, event) =>
       import scala.jdk.CollectionConverters._
-      val tags = tagsFor(event)
+      val tags = tagsFor(state, event)
       if (tags.isEmpty) Set.empty
       else tags.asScala.toSet
     }
@@ -224,7 +234,7 @@ abstract class EventSourcedBehavior[Command, Event, State] 
private[pekko] (
       getClass)
       .snapshotWhen(snapshotWhen)
       .withRetention(retentionCriteria.asScala)
-      .withTagger(tagger)
+      .withTaggerForState(tagger)
       .eventAdapter(eventAdapter())
       .snapshotAdapter(snapshotAdapter())
       .withJournalPluginId(journalPluginId)
diff --git 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehavior.scala
 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehavior.scala
index 692072a12d..23184348ff 100644
--- 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehavior.scala
+++ 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehavior.scala
@@ -204,6 +204,13 @@ object EventSourcedBehavior {
    */
   def withTagger(tagger: Event => Set[String]): EventSourcedBehavior[Command, 
Event, State]
 
+  /**
+   * The `tagger` function should give event tags, which will be used in 
persistence query.
+   * The state passed to the tagger allows for toggling a tag with one event 
but keep all events after it tagged
+   * based on a property or the type of the state.
+   */
+  def withTaggerForState(tagger: (State, Event) => Set[String]): 
EventSourcedBehavior[Command, Event, State]
+
   /**
    * Transform the event to another type before giving to the journal. Can be 
used to wrap events
    * in types Journals understand but is of a different type than `Event`.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to