This is an automated email from the ASF dual-hosted git repository.
pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new d87353432b Port akka PRs #31891, #31892, #31893: tags in
EventEnvelope, EventsByPersistenceIdTyped queries, withTaggerForState (#2920)
d87353432b is described below
commit d87353432b1e4b558538f7467e0d5590dfdca784
Author: PJ Fanning <[email protected]>
AuthorDate: Mon Apr 27 12:51:36 2026 +0200
Port akka PRs #31891, #31892, #31893: tags in EventEnvelope,
EventsByPersistenceIdTyped queries, withTaggerForState (#2920)
* Port akka PRs #31891, #31892, #31893: tags in EventEnvelope,
EventsByPersistenceIdTyped queries, withTaggerForState
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko/sessions/a96f8dd0-0966-4aea-86f6-9892c68ee5c1
Co-authored-by: pjfanning <[email protected]>
* Fix imports: use scala.jdk.CollectionConverters instead of
pekko.util.ccompat.JavaConverters
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko/sessions/a96f8dd0-0966-4aea-86f6-9892c68ee5c1
Co-authored-by: pjfanning <[email protected]>
* scalafmt
* javafmt
* Fix PersistenceProbeImpl: call tagger(state, event) after signature
change to (S,E)=>Set[String]
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko/sessions/80c7afcd-d6ae-4b03-83b6-207aedfdd8cd
Co-authored-by: pjfanning <[email protected]>
* Update typed-event-envelope-tags.excludes
---------
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
Co-authored-by: pjfanning <[email protected]>
---
.../query/internal/protobuf/QueryMessages.java | 241 ++++++++++++++++++++-
.../typed-event-envelope-tags.excludes | 27 +++
.../src/main/protobuf/QueryMessages.proto | 1 +
.../query/internal/QuerySerializer.scala | 14 +-
.../persistence/query/typed/EventEnvelope.scala | 82 ++++++-
.../CurrentEventsByPersistenceIdTypedQuery.scala | 41 ++++
.../javadsl/EventsByPersistenceIdTypedQuery.scala | 48 ++++
.../CurrentEventsByPersistenceIdTypedQuery.scala | 41 ++++
.../scaladsl/EventsByPersistenceIdTypedQuery.scala | 48 ++++
.../query/internal/QuerySerializerSpec.scala | 16 ++
.../testkit/internal/PersistenceProbeImpl.scala | 4 +-
.../internal/TypedEventsByPersistenceIdStage.scala | 116 ++++++++++
.../javadsl/PersistenceTestKitReadJournal.scala | 23 +-
.../scaladsl/PersistenceTestKitReadJournal.scala | 64 +++++-
.../testkit/query/CurrentEventsBySlicesSpec.scala | 22 ++
.../testkit/query/EventsByPersistenceIdSpec.scala | 23 +-
.../query/EventsByPersistenceIdTypedSpec.scala | 114 ++++++++++
.../typed/scaladsl/EventSourcedBehaviorSpec.scala | 22 ++
.../scaladsl/EventSourcedBehaviorWatchSpec.scala | 2 +-
.../tags-for-state.excludes | 22 ++
.../persistence/typed/internal/BehaviorSetup.scala | 2 +-
.../typed/internal/EventSourcedBehaviorImpl.scala | 5 +-
.../pekko/persistence/typed/internal/Running.scala | 11 +-
.../typed/javadsl/EventSourcedBehavior.scala | 18 +-
.../typed/scaladsl/EventSourcedBehavior.scala | 7 +
25 files changed, 983 insertions(+), 31 deletions(-)
diff --git
a/persistence-query/src/main/java/org/apache/pekko/persistence/query/internal/protobuf/QueryMessages.java
b/persistence-query/src/main/java/org/apache/pekko/persistence/query/internal/protobuf/QueryMessages.java
index bc26bed3e5..32ffcaee5e 100644
---
a/persistence-query/src/main/java/org/apache/pekko/persistence/query/internal/protobuf/QueryMessages.java
+++
b/persistence-query/src/main/java/org/apache/pekko/persistence/query/internal/protobuf/QueryMessages.java
@@ -239,6 +239,36 @@ public final class QueryMessages extends
org.apache.pekko.protobufv3.internal.Ge
* @return The bytes for source.
*/
org.apache.pekko.protobufv3.internal.ByteString getSourceBytes();
+
+ /**
+ * <code>repeated string tags = 12;</code>
+ *
+ * @return A list containing the tags.
+ */
+ java.util.List<java.lang.String> getTagsList();
+
+ /**
+ * <code>repeated string tags = 12;</code>
+ *
+ * @return The count of tags.
+ */
+ int getTagsCount();
+
+ /**
+ * <code>repeated string tags = 12;</code>
+ *
+ * @param index The index of the element to return.
+ * @return The tags at the given index.
+ */
+ java.lang.String getTags(int index);
+
+ /**
+ * <code>repeated string tags = 12;</code>
+ *
+ * @param index The index of the value to return.
+ * @return The bytes of the tags at the given index.
+ */
+ org.apache.pekko.protobufv3.internal.ByteString getTagsBytes(int index);
}
/**
@@ -279,6 +309,7 @@ public final class QueryMessages extends
org.apache.pekko.protobufv3.internal.Ge
offset_ = "";
offsetManifest_ = "";
source_ = "";
+ tags_ = org.apache.pekko.protobufv3.internal.LazyStringArrayList.EMPTY;
}
public static final
org.apache.pekko.protobufv3.internal.Descriptors.Descriptor
@@ -734,6 +765,47 @@ public final class QueryMessages extends
org.apache.pekko.protobufv3.internal.Ge
}
}
+ public static final int TAGS_FIELD_NUMBER = 12;
+ private org.apache.pekko.protobufv3.internal.LazyStringList tags_;
+
+ /**
+ * <code>repeated string tags = 12;</code>
+ *
+ * @return A list containing the tags.
+ */
+ public org.apache.pekko.protobufv3.internal.ProtocolStringList
getTagsList() {
+ return tags_;
+ }
+
+ /**
+ * <code>repeated string tags = 12;</code>
+ *
+ * @return The count of tags.
+ */
+ public int getTagsCount() {
+ return tags_.size();
+ }
+
+ /**
+ * <code>repeated string tags = 12;</code>
+ *
+ * @param index The index of the element to return.
+ * @return The tags at the given index.
+ */
+ public java.lang.String getTags(int index) {
+ return tags_.get(index);
+ }
+
+ /**
+ * <code>repeated string tags = 12;</code>
+ *
+ * @param index The index of the value to return.
+ * @return The bytes of the tags at the given index.
+ */
+ public org.apache.pekko.protobufv3.internal.ByteString getTagsBytes(int
index) {
+ return tags_.getByteString(index);
+ }
+
private byte memoizedIsInitialized = -1;
@java.lang.Override
@@ -824,6 +896,10 @@ public final class QueryMessages extends
org.apache.pekko.protobufv3.internal.Ge
if (((bitField0_ & 0x00000400) != 0)) {
org.apache.pekko.protobufv3.internal.GeneratedMessage.writeString(output, 11,
source_);
}
+ for (int i = 0; i < tags_.size(); i++) {
+ org.apache.pekko.protobufv3.internal.GeneratedMessage.writeString(
+ output, 12, tags_.getRaw(i));
+ }
getUnknownFields().writeTo(output);
}
@@ -879,6 +955,14 @@ public final class QueryMessages extends
org.apache.pekko.protobufv3.internal.Ge
size +=
org.apache.pekko.protobufv3.internal.GeneratedMessage.computeStringSize(11,
source_);
}
+ {
+ int dataSize = 0;
+ for (int i = 0; i < tags_.size(); i++) {
+ dataSize += computeStringSizeNoTag(tags_.getRaw(i));
+ }
+ size += dataSize;
+ size += 1 * getTagsList().size();
+ }
size += getUnknownFields().getSerializedSize();
memoizedSize = size;
return size;
@@ -941,6 +1025,7 @@ public final class QueryMessages extends
org.apache.pekko.protobufv3.internal.Ge
if (hasSource()) {
if (!getSource().equals(other.getSource())) return false;
}
+ if (!getTagsList().equals(other.getTagsList())) return false;
if (!getUnknownFields().equals(other.getUnknownFields())) return false;
return true;
}
@@ -998,6 +1083,10 @@ public final class QueryMessages extends
org.apache.pekko.protobufv3.internal.Ge
hash = (37 * hash) + SOURCE_FIELD_NUMBER;
hash = (53 * hash) + getSource().hashCode();
}
+ if (getTagsCount() > 0) {
+ hash = (37 * hash) + TAGS_FIELD_NUMBER;
+ hash = (53 * hash) + getTagsList().hashCode();
+ }
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@@ -1191,6 +1280,8 @@ public final class QueryMessages extends
org.apache.pekko.protobufv3.internal.Ge
}
filtered_ = false;
source_ = "";
+ tags_ = org.apache.pekko.protobufv3.internal.LazyStringArrayList.EMPTY;
+ bitField0_ = (bitField0_ & ~0x00000800);
return this;
}
@@ -1279,6 +1370,11 @@ public final class QueryMessages extends
org.apache.pekko.protobufv3.internal.Ge
result.source_ = source_;
to_bitField0_ |= 0x00000400;
}
+ if (((from_bitField0_ & 0x00000800) != 0)) {
+ tags_ = tags_.getUnmodifiableView();
+ bitField0_ = (bitField0_ & ~0x00000800);
+ }
+ result.tags_ = tags_;
result.bitField0_ |= to_bitField0_;
}
@@ -1344,6 +1440,16 @@ public final class QueryMessages extends
org.apache.pekko.protobufv3.internal.Ge
bitField0_ |= 0x00000400;
onChanged();
}
+ if (!other.tags_.isEmpty()) {
+ if (tags_.isEmpty()) {
+ tags_ = other.tags_;
+ bitField0_ = (bitField0_ & ~0x00000800);
+ } else {
+ ensureTagsIsMutable();
+ tags_.addAll(other.tags_);
+ }
+ onChanged();
+ }
this.mergeUnknownFields(other.getUnknownFields());
onChanged();
return this;
@@ -1468,6 +1574,13 @@ public final class QueryMessages extends
org.apache.pekko.protobufv3.internal.Ge
bitField0_ |= 0x00000400;
break;
} // case 90
+ case 98:
+ {
+ java.lang.String s = input.readStringRequireUtf8();
+ ensureTagsIsMutable();
+ tags_.add(s);
+ break;
+ } // case 98
default:
{
if (!super.parseUnknownField(input, extensionRegistry, tag))
{
@@ -2394,6 +2507,128 @@ public final class QueryMessages extends
org.apache.pekko.protobufv3.internal.Ge
return this;
}
+ private org.apache.pekko.protobufv3.internal.LazyStringList tags_ =
+ org.apache.pekko.protobufv3.internal.LazyStringArrayList.EMPTY;
+
+ private void ensureTagsIsMutable() {
+ if (!((bitField0_ & 0x00000800) != 0)) {
+ tags_ = new
org.apache.pekko.protobufv3.internal.LazyStringArrayList(tags_);
+ bitField0_ |= 0x00000800;
+ }
+ }
+
+ /**
+ * <code>repeated string tags = 12;</code>
+ *
+ * @return A list containing the tags.
+ */
+ public org.apache.pekko.protobufv3.internal.ProtocolStringList
getTagsList() {
+ return tags_.getUnmodifiableView();
+ }
+
+ /**
+ * <code>repeated string tags = 12;</code>
+ *
+ * @return The count of tags.
+ */
+ public int getTagsCount() {
+ return tags_.size();
+ }
+
+ /**
+ * <code>repeated string tags = 12;</code>
+ *
+ * @param index The index of the element to return.
+ * @return The tags at the given index.
+ */
+ public java.lang.String getTags(int index) {
+ return tags_.get(index);
+ }
+
+ /**
+ * <code>repeated string tags = 12;</code>
+ *
+ * @param index The index of the value to return.
+ * @return The bytes of the tags at the given index.
+ */
+ public org.apache.pekko.protobufv3.internal.ByteString getTagsBytes(int
index) {
+ return tags_.getByteString(index);
+ }
+
+ /**
+ * <code>repeated string tags = 12;</code>
+ *
+ * @param index The index to set the value at.
+ * @param value The tags to set.
+ * @return This builder for chaining.
+ */
+ public Builder setTags(int index, java.lang.String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ ensureTagsIsMutable();
+ tags_.set(index, value);
+ onChanged();
+ return this;
+ }
+
+ /**
+ * <code>repeated string tags = 12;</code>
+ *
+ * @param value The tags to add.
+ * @return This builder for chaining.
+ */
+ public Builder addTags(java.lang.String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ ensureTagsIsMutable();
+ tags_.add(value);
+ onChanged();
+ return this;
+ }
+
+ /**
+ * <code>repeated string tags = 12;</code>
+ *
+ * @param values The tags to add.
+ * @return This builder for chaining.
+ */
+ public Builder addAllTags(java.lang.Iterable<java.lang.String> values) {
+ ensureTagsIsMutable();
+
org.apache.pekko.protobufv3.internal.AbstractMessageLite.Builder.addAll(values,
tags_);
+ onChanged();
+ return this;
+ }
+
+ /**
+ * <code>repeated string tags = 12;</code>
+ *
+ * @return This builder for chaining.
+ */
+ public Builder clearTags() {
+ tags_ = org.apache.pekko.protobufv3.internal.LazyStringArrayList.EMPTY;
+ bitField0_ = (bitField0_ & ~0x00000800);
+ onChanged();
+ return this;
+ }
+
+ /**
+ * <code>repeated string tags = 12;</code>
+ *
+ * @param value The bytes of the tags to add.
+ * @return This builder for chaining.
+ */
+ public Builder
addTagsBytes(org.apache.pekko.protobufv3.internal.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ ensureTagsIsMutable();
+ tags_.add(value);
+ onChanged();
+ return this;
+ }
+
//
@@protoc_insertion_point(builder_scope:org.apache.pekko.persistence.query.EventEnvelope)
}
@@ -2466,7 +2701,7 @@ public final class QueryMessages extends
org.apache.pekko.protobufv3.internal.Ge
java.lang.String[] descriptorData = {
"\n"
+ "\023QueryMessages.proto\022\"org.apache.pekko."
- + "persistence.query\032\026ContainerFormats.proto\"\363\001\n\r"
+ + "persistence.query\032\026ContainerFormats.proto\"\201\002\n\r"
+ "EventEnvelope\022\026\n"
+ "\016persistence_id\030\001 \002(\t\022\023\n"
+ "\013entity_type\030\002 \002(\t\022\r\n"
@@ -2479,7 +2714,8 @@ public final class QueryMessages extends
org.apache.pekko.protobufv3.internal.Ge
+ "\010metadata\030\t \001(\0132\010.Payload\022\020\n"
+ "\010filtered\030\n"
+ " \001(\010\022\016\n"
- + "\006source\030\013 \001(\tB8\n"
+ + "\006source\030\013 \001(\t\022\014\n"
+ + "\004tags\030\014 \003(\tB8\n"
+ "4org.apache.pekko.persistence.query.internal.protobufH\001"
};
descriptor =
@@ -2506,6 +2742,7 @@ public final class QueryMessages extends
org.apache.pekko.protobufv3.internal.Ge
"Metadata",
"Filtered",
"Source",
+ "Tags",
});
descriptor.resolveAllFeaturesImmutable();
org.apache.pekko.remote.ContainerFormats.getDescriptor();
diff --git
a/persistence-query/src/main/mima-filters/2.0.x.backwards.excludes/typed-event-envelope-tags.excludes
b/persistence-query/src/main/mima-filters/2.0.x.backwards.excludes/typed-event-envelope-tags.excludes
new file mode 100644
index 0000000000..dc01137b4c
--- /dev/null
+++
b/persistence-query/src/main/mima-filters/2.0.x.backwards.excludes/typed-event-envelope-tags.excludes
@@ -0,0 +1,27 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Internal protobuf classes
+ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.persistence.query.internal.protobuf.QueryMessages#EventEnvelopeOrBuilder.getTags")
+ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.persistence.query.internal.protobuf.QueryMessages#EventEnvelopeOrBuilder.getTagsList")
+ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.persistence.query.internal.protobuf.QueryMessages#EventEnvelopeOrBuilder.getTagsCount")
+ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.persistence.query.internal.protobuf.QueryMessages#EventEnvelopeOrBuilder.getTagsBytes")
+
+# New constructors and methods in typed EventEnvelope
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.persistence.query.typed.EventEnvelope.this")
+ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.persistence.query.typed.EventEnvelope.apply")
+ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.persistence.query.typed.EventEnvelope.create")
diff --git a/persistence-query/src/main/protobuf/QueryMessages.proto
b/persistence-query/src/main/protobuf/QueryMessages.proto
index f3c3d192f7..02a3e66e65 100644
--- a/persistence-query/src/main/protobuf/QueryMessages.proto
+++ b/persistence-query/src/main/protobuf/QueryMessages.proto
@@ -32,4 +32,5 @@ message EventEnvelope {
optional Payload metadata = 9;
optional bool filtered = 10;
optional string source = 11;
+ repeated string tags = 12;
}
diff --git
a/persistence-query/src/main/scala/org/apache/pekko/persistence/query/internal/QuerySerializer.scala
b/persistence-query/src/main/scala/org/apache/pekko/persistence/query/internal/QuerySerializer.scala
index 6e0538bf27..25ffdeabc1 100644
---
a/persistence-query/src/main/scala/org/apache/pekko/persistence/query/internal/QuerySerializer.scala
+++
b/persistence-query/src/main/scala/org/apache/pekko/persistence/query/internal/QuerySerializer.scala
@@ -91,6 +91,11 @@ import pekko.serialization.Serializers
if (env.source.nonEmpty)
builder.setSource(env.source)
+ if (env.tags.nonEmpty) {
+ import scala.jdk.CollectionConverters._
+ builder.addAllTags(env.tags.asJava)
+ }
+
builder.build().toByteArray()
case offset: Offset =>
@@ -115,6 +120,12 @@ import pekko.serialization.Serializers
val filtered = env.hasFiltered && env.getFiltered
val source = if (env.hasSource) env.getSource else ""
+ val tags =
+ if (env.getTagsList.isEmpty) Set.empty[String]
+ else {
+ import scala.jdk.CollectionConverters._
+ env.getTagsList.iterator.asScala.toSet
+ }
new EventEnvelope(
offset,
@@ -126,7 +137,8 @@ import pekko.serialization.Serializers
env.getEntityType,
env.getSlice,
filtered,
- source)
+ source,
+ tags)
case _ =>
fromStorageRepresentation(new String(bytes, UTF_8), manifest)
diff --git
a/persistence-query/src/main/scala/org/apache/pekko/persistence/query/typed/EventEnvelope.scala
b/persistence-query/src/main/scala/org/apache/pekko/persistence/query/typed/EventEnvelope.scala
index 551d46526d..23cdbea46d 100644
---
a/persistence-query/src/main/scala/org/apache/pekko/persistence/query/typed/EventEnvelope.scala
+++
b/persistence-query/src/main/scala/org/apache/pekko/persistence/query/typed/EventEnvelope.scala
@@ -14,6 +14,7 @@
package org.apache.pekko.persistence.query.typed
import java.util.Optional
+import java.util.{ Set => JSet }
import org.apache.pekko
import pekko.annotation.ApiMayChange
@@ -21,6 +22,31 @@ import pekko.persistence.query.Offset
import pekko.util.HashCode
object EventEnvelope {
+
+ def apply[Event](
+ offset: Offset,
+ persistenceId: String,
+ sequenceNr: Long,
+ event: Event,
+ timestamp: Long,
+ entityType: String,
+ slice: Int,
+ filtered: Boolean,
+ source: String,
+ tags: Set[String]): EventEnvelope[Event] =
+ new EventEnvelope(
+ offset,
+ persistenceId,
+ sequenceNr,
+ Option(event),
+ timestamp,
+ None,
+ entityType,
+ slice,
+ filtered,
+ source,
+ tags)
+
def apply[Event](
offset: Offset,
persistenceId: String,
@@ -53,6 +79,21 @@ object EventEnvelope {
filtered,
source)
+ def create[Event](
+ offset: Offset,
+ persistenceId: String,
+ sequenceNr: Long,
+ event: Event,
+ timestamp: Long,
+ entityType: String,
+ slice: Int,
+ filtered: Boolean,
+ source: String,
+ tags: JSet[String]): EventEnvelope[Event] = {
+ import scala.jdk.CollectionConverters._
+ apply(offset, persistenceId, sequenceNr, event, timestamp, entityType,
slice, filtered, source, tags.asScala.toSet)
+ }
+
def create[Event](
offset: Offset,
persistenceId: String,
@@ -104,7 +145,32 @@ final class EventEnvelope[Event](
val entityType: String,
val slice: Int,
val filtered: Boolean,
- val source: String) {
+ val source: String,
+ val tags: Set[String]) {
+
+ def this(
+ offset: Offset,
+ persistenceId: String,
+ sequenceNr: Long,
+ eventOption: Option[Event],
+ timestamp: Long,
+ eventMetadata: Option[Any],
+ entityType: String,
+ slice: Int,
+ filtered: Boolean,
+ source: String) =
+ this(
+ offset,
+ persistenceId,
+ sequenceNr,
+ eventOption,
+ timestamp,
+ eventMetadata,
+ entityType,
+ slice,
+ filtered,
+ source,
+ tags = Set.empty)
def this(
offset: Offset,
@@ -152,6 +218,14 @@ final class EventEnvelope[Event](
eventMetadata.toJava.asInstanceOf[Optional[AnyRef]]
}
+ /**
+ * Java API:
+ */
+ def getTags(): JSet[String] = {
+ import scala.jdk.CollectionConverters._
+ tags.asJava
+ }
+
override def hashCode(): Int = {
var result = HashCode.SEED
result = HashCode.hash(result, offset)
@@ -164,7 +238,8 @@ final class EventEnvelope[Event](
case other: EventEnvelope[_] =>
offset == other.offset && persistenceId == other.persistenceId &&
sequenceNr == other.sequenceNr &&
eventOption == other.eventOption && timestamp == other.timestamp &&
eventMetadata == other.eventMetadata &&
- entityType == other.entityType && slice == other.slice && filtered ==
other.filtered
+ entityType == other.entityType && slice == other.slice && filtered ==
other.filtered &&
+ tags == other.tags
case _ => false
}
@@ -177,6 +252,7 @@ final class EventEnvelope[Event](
case Some(meta) => meta.getClass.getName
case None => ""
}
-
s"EventEnvelope($offset,$persistenceId,$sequenceNr,$eventStr,$timestamp,$metaStr,$entityType,$slice,$filtered,$source)"
+
s"EventEnvelope($offset,$persistenceId,$sequenceNr,$eventStr,$timestamp,$metaStr,$entityType,$slice,$filtered,$source,${tags
+ .mkString("[", ", ", "]")})"
}
}
diff --git
a/persistence-query/src/main/scala/org/apache/pekko/persistence/query/typed/javadsl/CurrentEventsByPersistenceIdTypedQuery.scala
b/persistence-query/src/main/scala/org/apache/pekko/persistence/query/typed/javadsl/CurrentEventsByPersistenceIdTypedQuery.scala
new file mode 100644
index 0000000000..75e57df815
--- /dev/null
+++
b/persistence-query/src/main/scala/org/apache/pekko/persistence/query/typed/javadsl/CurrentEventsByPersistenceIdTypedQuery.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2021-2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.query.typed.javadsl
+
+import org.apache.pekko
+import pekko.NotUsed
+import pekko.annotation.ApiMayChange
+import pekko.persistence.query.javadsl.ReadJournal
+import pekko.persistence.query.typed.EventEnvelope
+import pekko.stream.javadsl.Source
+
+/**
+ * A plugin may optionally support this query by implementing this trait.
+ *
+ * API May Change
+ */
+@ApiMayChange
+trait CurrentEventsByPersistenceIdTypedQuery extends ReadJournal {
+
+ /**
+ * Same as [[EventsByPersistenceIdTypedQuery.eventsByPersistenceIdTyped]]
but the stream is completed
+ * immediately when it reaches the end of the "current" events. Events that
are stored after the
+ * query is started are not included in the stream.
+ */
+ def currentEventsByPersistenceIdTyped[Event](
+ persistenceId: String,
+ fromSequenceNr: Long,
+ toSequenceNr: Long): Source[EventEnvelope[Event], NotUsed]
+
+}
diff --git
a/persistence-query/src/main/scala/org/apache/pekko/persistence/query/typed/javadsl/EventsByPersistenceIdTypedQuery.scala
b/persistence-query/src/main/scala/org/apache/pekko/persistence/query/typed/javadsl/EventsByPersistenceIdTypedQuery.scala
new file mode 100644
index 0000000000..6e925e92d7
--- /dev/null
+++
b/persistence-query/src/main/scala/org/apache/pekko/persistence/query/typed/javadsl/EventsByPersistenceIdTypedQuery.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2021-2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.query.typed.javadsl
+
+import org.apache.pekko
+import pekko.NotUsed
+import pekko.annotation.ApiMayChange
+import pekko.persistence.query.javadsl.ReadJournal
+import pekko.persistence.query.typed.EventEnvelope
+import pekko.stream.javadsl.Source
+
+/**
+ * A plugin may optionally support this query by implementing this trait.
+ *
+ * API May Change
+ */
+@ApiMayChange
+trait EventsByPersistenceIdTypedQuery extends ReadJournal {
+
+ /**
+ * Query events for a specific `PersistenceId`.
+ *
+ * Events are emitted in the order they were stored. The stream also emits
events that are persisted
+ * after the query is started. The stream is not completed when it reaches
the end of the currently stored
+ * events, but it continues to push new events when new events are
persisted. Corresponding query that is
+ * completed when it reaches the end of the currently stored events is
provided by
+ *
[[CurrentEventsByPersistenceIdTypedQuery.currentEventsByPersistenceIdTyped]].
+ *
+ * The `fromSequenceNr` and `toSequenceNr` can be used to limit what
sequence numbers the returned stream
+ * will contain. Both sides are inclusive. `0` and `Long.MaxValue` are used
to signify no lower/upper bound.
+ */
+ def eventsByPersistenceIdTyped[Event](
+ persistenceId: String,
+ fromSequenceNr: Long,
+ toSequenceNr: Long): Source[EventEnvelope[Event], NotUsed]
+
+}
diff --git
a/persistence-query/src/main/scala/org/apache/pekko/persistence/query/typed/scaladsl/CurrentEventsByPersistenceIdTypedQuery.scala
b/persistence-query/src/main/scala/org/apache/pekko/persistence/query/typed/scaladsl/CurrentEventsByPersistenceIdTypedQuery.scala
new file mode 100644
index 0000000000..f07776d386
--- /dev/null
+++
b/persistence-query/src/main/scala/org/apache/pekko/persistence/query/typed/scaladsl/CurrentEventsByPersistenceIdTypedQuery.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2021-2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.query.typed.scaladsl
+
+import org.apache.pekko
+import pekko.NotUsed
+import pekko.annotation.ApiMayChange
+import pekko.persistence.query.scaladsl.ReadJournal
+import pekko.persistence.query.typed.EventEnvelope
+import pekko.stream.scaladsl.Source
+
+/**
+ * A plugin may optionally support this query by implementing this trait.
+ *
+ * API May Change
+ */
+@ApiMayChange
+trait CurrentEventsByPersistenceIdTypedQuery extends ReadJournal {
+
+ /**
+ * Same as [[EventsByPersistenceIdTypedQuery.eventsByPersistenceIdTyped]]
but the stream is completed
+ * immediately when it reaches the end of the "current" events. Events that
are stored after the
+ * query is started are not included in the stream.
+ */
+ def currentEventsByPersistenceIdTyped[Event](
+ persistenceId: String,
+ fromSequenceNr: Long,
+ toSequenceNr: Long): Source[EventEnvelope[Event], NotUsed]
+
+}
diff --git
a/persistence-query/src/main/scala/org/apache/pekko/persistence/query/typed/scaladsl/EventsByPersistenceIdTypedQuery.scala
b/persistence-query/src/main/scala/org/apache/pekko/persistence/query/typed/scaladsl/EventsByPersistenceIdTypedQuery.scala
new file mode 100644
index 0000000000..9e05b1751f
--- /dev/null
+++
b/persistence-query/src/main/scala/org/apache/pekko/persistence/query/typed/scaladsl/EventsByPersistenceIdTypedQuery.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2021-2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.query.typed.scaladsl
+
+import org.apache.pekko
+import pekko.NotUsed
+import pekko.annotation.ApiMayChange
+import pekko.persistence.query.scaladsl.ReadJournal
+import pekko.persistence.query.typed.EventEnvelope
+import pekko.stream.scaladsl.Source
+
+/**
+ * A plugin may optionally support this query by implementing this trait.
+ *
+ * API May Change
+ */
+@ApiMayChange
+trait EventsByPersistenceIdTypedQuery extends ReadJournal {
+
+ /**
+ * Query events for a specific `PersistenceId`.
+ *
+ * Events are emitted in the order they were stored. The stream also emits
events that are persisted
+ * after the query is started. The stream is not completed when it reaches
the end of the currently stored
+ * events, but it continues to push new events when new events are
persisted. Corresponding query that is
+ * completed when it reaches the end of the currently stored events is
provided by
+ *
[[CurrentEventsByPersistenceIdTypedQuery.currentEventsByPersistenceIdTyped]].
+ *
+ * The `fromSequenceNr` and `toSequenceNr` can be used to limit what
sequence numbers the returned stream
+ * will contain. Both sides are inclusive. `0` and `Long.MaxValue` are used
to signify no lower/upper bound.
+ */
+ def eventsByPersistenceIdTyped[Event](
+ persistenceId: String,
+ fromSequenceNr: Long,
+ toSequenceNr: Long): Source[EventEnvelope[Event], NotUsed]
+
+}
diff --git
a/persistence-query/src/test/scala/org/apache/pekko/persistence/query/internal/QuerySerializerSpec.scala
b/persistence-query/src/test/scala/org/apache/pekko/persistence/query/internal/QuerySerializerSpec.scala
index 1d9268d226..8d0afba57f 100644
---
a/persistence-query/src/test/scala/org/apache/pekko/persistence/query/internal/QuerySerializerSpec.scala
+++
b/persistence-query/src/test/scala/org/apache/pekko/persistence/query/internal/QuerySerializerSpec.scala
@@ -91,6 +91,22 @@ class QuerySerializerSpec extends PekkoSpec {
source = "backtracking"))
}
+ "serialize EventEnvelope with source and tags" in {
+ verifySerialization(
+ new EventEnvelope(
+ Sequence(1L),
+ "TestEntity|id1",
+ 3L,
+ Some("event1"),
+ System.currentTimeMillis(),
+ None,
+ "TestEntity",
+ 5,
+ filtered = false,
+ source = "query",
+ tags = Set("tag1", "tag2")))
+ }
+
"serialize Sequence Offset" in {
verifySerialization(Sequence(0))
}
diff --git
a/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/internal/PersistenceProbeImpl.scala
b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/internal/PersistenceProbeImpl.scala
index dbe95098b1..d7ddd69043 100644
---
a/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/internal/PersistenceProbeImpl.scala
+++
b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/internal/PersistenceProbeImpl.scala
@@ -147,7 +147,7 @@ private[pekko] object PersistenceProbeImpl {
case Persist(event) =>
sequenceNr += 1
state = eventHandler(state, event)
- onEvent(event, sequenceNr, tagger(event))
+ onEvent(event, sequenceNr, tagger(state, event))
shouldSnapshot = shouldSnapshot || snapshotRequested(event)
sideEffect(sideEffects)
@@ -162,7 +162,7 @@ private[pekko] object PersistenceProbeImpl {
eventsWithSeqNrs.foreach {
case (event, seqNr) =>
// technically doesn't persist them atomically, but in tests
that shouldn't matter
- onEvent(event, seqNr, tagger(event))
+ onEvent(event, seqNr, tagger(state, event))
shouldSnapshot = shouldSnapshot || snapshotRequested(event)
}
diff --git
a/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/query/internal/TypedEventsByPersistenceIdStage.scala
b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/query/internal/TypedEventsByPersistenceIdStage.scala
new file mode 100644
index 0000000000..70698a9f7f
--- /dev/null
+++
b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/query/internal/TypedEventsByPersistenceIdStage.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2020-2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.testkit.query.internal
+
+import java.time.Instant
+import java.time.temporal.ChronoUnit
+
+import org.apache.pekko
+import pekko.actor.ActorRef
+import pekko.annotation.InternalApi
+import pekko.persistence.Persistence
+import pekko.persistence.journal.Tagged
+import pekko.persistence.query.TimestampOffset
+import pekko.persistence.query.typed.EventEnvelope
+import pekko.persistence.testkit.{ EventStorage, PersistenceTestKitPlugin }
+import pekko.persistence.typed.PersistenceId
+import pekko.stream.{ Attributes, Outlet, SourceShape }
+import pekko.stream.stage.{ GraphStage, GraphStageLogic,
GraphStageLogicWithLogging, OutHandler }
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+final private[pekko] class TypedEventsByPersistenceIdStage[Event](
+ persistenceId: String,
+ fromSequenceNr: Long,
+ toSequenceNr: Long,
+ storage: EventStorage,
+ persistence: Persistence)
+ extends GraphStage[SourceShape[EventEnvelope[Event]]] {
+ val out: Outlet[EventEnvelope[Event]] =
Outlet("TypedEventsByPersistenceIdSource")
+ override def shape: SourceShape[EventEnvelope[Event]] = SourceShape(out)
+
+ override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
{
+ new GraphStageLogicWithLogging(shape) with OutHandler {
+ private var currentSequenceNr = math.max(fromSequenceNr, 1)
+ private var stageActorRef: ActorRef = null
+ override def preStart(): Unit = {
+ stageActorRef = getStageActor(receiveNotifications).ref
+ materializer.system.eventStream.subscribe(stageActorRef,
classOf[PersistenceTestKitPlugin.Write])
+ }
+
+ private def receiveNotifications(in: (ActorRef, Any)): Unit = {
+ val (_, msg) = in
+ msg match {
+ case PersistenceTestKitPlugin.Write(pid, toSequenceNr) if pid ==
persistenceId =>
+ if (toSequenceNr >= currentSequenceNr) {
+ tryPush()
+ }
+ case _ =>
+ }
+ }
+
+ private def tryPush(): Unit = {
+ if (isAvailable(out)) {
+ val event = storage.tryRead(persistenceId, currentSequenceNr,
currentSequenceNr, 1)
+ log.debug("tryPush available. Query for {} {} result {}",
currentSequenceNr, currentSequenceNr, event)
+ event.headOption match {
+ case Some(pr) =>
+ val timestamp = Instant.ofEpochMilli(pr.timestamp)
+ val readTimestamp = Instant.now().truncatedTo(ChronoUnit.MICROS)
+ val slice = persistence.sliceForPersistenceId(persistenceId)
+ val entityType = PersistenceId.extractEntityType(persistenceId)
+ val tags = pr.payload match {
+ case Tagged(_, t) => t
+ case _ => Set.empty[String]
+ }
+ val payload = pr.payload match {
+ case Tagged(p, _) => p
+ case p => p
+ }
+ push(out,
+ EventEnvelope(
+ TimestampOffset(timestamp, readTimestamp,
Map(pr.persistenceId -> pr.sequenceNr)),
+ pr.persistenceId,
+ pr.sequenceNr,
+ payload.asInstanceOf[Event],
+ pr.timestamp,
+ entityType,
+ slice,
+ filtered = false,
+ source = "",
+ tags = tags))
+ if (currentSequenceNr == toSequenceNr) {
+ completeStage()
+ } else {
+ currentSequenceNr += 1
+ }
+ case None =>
+ }
+ } else {
+ log.debug("tryPush, no demand")
+ }
+ }
+
+ override def onPull(): Unit = {
+ tryPush()
+ }
+
+ setHandler(out, this)
+ }
+
+ }
+
+}
diff --git
a/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/query/javadsl/PersistenceTestKitReadJournal.scala
b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/query/javadsl/PersistenceTestKitReadJournal.scala
index 49f63684bb..15e11df159 100644
---
a/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/query/javadsl/PersistenceTestKitReadJournal.scala
+++
b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/query/javadsl/PersistenceTestKitReadJournal.scala
@@ -26,7 +26,12 @@ import pekko.persistence.query.javadsl.{
ReadJournal
}
import pekko.persistence.query.typed
-import pekko.persistence.query.typed.javadsl.{ CurrentEventsBySliceQuery,
EventsBySliceQuery }
+import pekko.persistence.query.typed.javadsl.{
+ CurrentEventsByPersistenceIdTypedQuery,
+ CurrentEventsBySliceQuery,
+ EventsByPersistenceIdTypedQuery,
+ EventsBySliceQuery
+}
import pekko.persistence.testkit.query.scaladsl
import pekko.stream.javadsl.Source
@@ -41,7 +46,9 @@ final class PersistenceTestKitReadJournal(delegate:
scaladsl.PersistenceTestKitR
with CurrentEventsByTagQuery
with CurrentEventsBySliceQuery
with EventsByTagQuery
- with EventsBySliceQuery {
+ with EventsBySliceQuery
+ with EventsByPersistenceIdTypedQuery
+ with CurrentEventsByPersistenceIdTypedQuery {
override def eventsByPersistenceId(
persistenceId: String,
@@ -55,6 +62,18 @@ final class PersistenceTestKitReadJournal(delegate:
scaladsl.PersistenceTestKitR
toSequenceNr: Long): Source[EventEnvelope, NotUsed] =
delegate.currentEventsByPersistenceId(persistenceId, fromSequenceNr,
toSequenceNr).asJava
+ override def eventsByPersistenceIdTyped[Event](
+ persistenceId: String,
+ fromSequenceNr: Long,
+ toSequenceNr: Long): Source[typed.EventEnvelope[Event], NotUsed] =
+ delegate.eventsByPersistenceIdTyped(persistenceId, fromSequenceNr,
toSequenceNr).asJava
+
+ override def currentEventsByPersistenceIdTyped[Event](
+ persistenceId: String,
+ fromSequenceNr: Long,
+ toSequenceNr: Long): Source[typed.EventEnvelope[Event], NotUsed] =
+ delegate.currentEventsByPersistenceIdTyped(persistenceId, fromSequenceNr,
toSequenceNr).asJava
+
override def currentEventsByTag(tag: String, offset: Offset):
Source[EventEnvelope, NotUsed] =
delegate.currentEventsByTag(tag, offset).asJava
diff --git
a/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/query/scaladsl/PersistenceTestKitReadJournal.scala
b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/query/scaladsl/PersistenceTestKitReadJournal.scala
index 73c7945160..4d7d6e8e70 100644
---
a/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/query/scaladsl/PersistenceTestKitReadJournal.scala
+++
b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/query/scaladsl/PersistenceTestKitReadJournal.scala
@@ -8,10 +8,12 @@
*/
/*
- * Copyright (C) 2020-2022 Lightbend Inc. <https://www.lightbend.com>
+ * Copyright (C) 2020-2023 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.testkit.query.scaladsl
+import java.time.Instant
+import java.time.temporal.ChronoUnit
import scala.annotation.nowarn
import scala.collection.immutable
@@ -23,6 +25,7 @@ import pekko.persistence.journal.Tagged
import pekko.persistence.query.{ EventEnvelope, Sequence }
import pekko.persistence.query.NoOffset
import pekko.persistence.query.Offset
+import pekko.persistence.query.TimestampOffset
import pekko.persistence.query.scaladsl.{
CurrentEventsByPersistenceIdQuery,
CurrentEventsByTagQuery,
@@ -32,13 +35,16 @@ import pekko.persistence.query.scaladsl.{
}
import pekko.persistence.query.scaladsl.EventsByTagQuery
import pekko.persistence.query.typed
+import
pekko.persistence.query.typed.scaladsl.CurrentEventsByPersistenceIdTypedQuery
import pekko.persistence.query.typed.scaladsl.CurrentEventsBySliceQuery
+import pekko.persistence.query.typed.scaladsl.EventsByPersistenceIdTypedQuery
import pekko.persistence.query.typed.scaladsl.EventsBySliceQuery
import pekko.persistence.testkit.EventStorage
import pekko.persistence.testkit.internal.InMemStorageExtension
import pekko.persistence.testkit.query.internal.EventsByPersistenceIdStage
import pekko.persistence.testkit.query.internal.EventsBySliceStage
import pekko.persistence.testkit.query.internal.EventsByTagStage
+import pekko.persistence.testkit.query.internal.TypedEventsByPersistenceIdStage
import pekko.persistence.typed.PersistenceId
import pekko.stream.scaladsl.Source
@@ -59,7 +65,9 @@ final class PersistenceTestKitReadJournal(system:
ExtendedActorSystem, @nowarn("
with CurrentEventsBySliceQuery
with PagedPersistenceIdsQuery
with EventsByTagQuery
- with EventsBySliceQuery {
+ with EventsBySliceQuery
+ with EventsByPersistenceIdTypedQuery
+ with CurrentEventsByPersistenceIdTypedQuery {
private val log = LoggerFactory.getLogger(getClass)
@@ -77,6 +85,17 @@ final class PersistenceTestKitReadJournal(system:
ExtendedActorSystem, @nowarn("
case payload => payload
}
+ private def tagsFor(payload: Any): Set[String] = payload match {
+ case Tagged(_, tags) => tags
+ case _ => Set.empty
+ }
+
+ private def timestampOffsetFor(pr: pekko.persistence.PersistentRepr):
TimestampOffset = {
+ val timestamp = Instant.ofEpochMilli(pr.timestamp)
+ val readTimestamp = Instant.now().truncatedTo(ChronoUnit.MICROS)
+ TimestampOffset(timestamp, readTimestamp, Map(pr.persistenceId ->
pr.sequenceNr))
+ }
+
override def eventsByPersistenceId(
persistenceId: String,
fromSequenceNr: Long = 0,
@@ -99,6 +118,35 @@ final class PersistenceTestKitReadJournal(system:
ExtendedActorSystem, @nowarn("
}
}
+ override def eventsByPersistenceIdTyped[Event](
+ persistenceId: String,
+ fromSequenceNr: Long = 0,
+ toSequenceNr: Long = Long.MaxValue): Source[typed.EventEnvelope[Event],
NotUsed] = {
+ Source.fromGraph(
+ new TypedEventsByPersistenceIdStage[Event](persistenceId,
fromSequenceNr, toSequenceNr, storage, persistence))
+ }
+
+ override def currentEventsByPersistenceIdTyped[Event](
+ persistenceId: String,
+ fromSequenceNr: Long = 0,
+ toSequenceNr: Long = Long.MaxValue): Source[typed.EventEnvelope[Event],
NotUsed] = {
+ val slice = persistence.sliceForPersistenceId(persistenceId)
+ val entityType = PersistenceId.extractEntityType(persistenceId)
+ Source(storage.tryRead(persistenceId, fromSequenceNr, toSequenceNr,
Long.MaxValue)).map { pr =>
+ typed.EventEnvelope(
+ timestampOffsetFor(pr),
+ persistenceId,
+ pr.sequenceNr,
+ unwrapTaggedPayload(pr.payload).asInstanceOf[Event],
+ pr.timestamp,
+ entityType,
+ slice,
+ filtered = false,
+ source = "",
+ tags = tagsFor(pr.payload))
+ }
+ }
+
override def currentEventsByTag(tag: String, offset: Offset = NoOffset):
Source[EventEnvelope, NotUsed] = {
offset match {
case NoOffset =>
@@ -134,15 +182,17 @@ final class PersistenceTestKitReadJournal(system:
ExtendedActorSystem, @nowarn("
})
Source(prs).map { pr =>
val slice = persistence.sliceForPersistenceId(pr.persistenceId)
- new typed.EventEnvelope[Event](
- Sequence(pr.sequenceNr),
+ typed.EventEnvelope(
+ timestampOffsetFor(pr),
pr.persistenceId,
pr.sequenceNr,
- Some(pr.payload.asInstanceOf[Event]),
+ unwrapTaggedPayload(pr.payload).asInstanceOf[Event],
pr.timestamp,
- pr.metadata,
entityType,
- slice)
+ slice,
+ filtered = false,
+ source = "",
+ tags = tagsFor(pr.payload))
}
}
diff --git
a/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/query/CurrentEventsBySlicesSpec.scala
b/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/query/CurrentEventsBySlicesSpec.scala
index ad4482667f..82365c3b15 100644
---
a/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/query/CurrentEventsBySlicesSpec.scala
+++
b/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/query/CurrentEventsBySlicesSpec.scala
@@ -21,6 +21,7 @@ import pekko.actor.typed.ActorRef
import pekko.persistence.Persistence
import pekko.persistence.query.NoOffset
import pekko.persistence.query.PersistenceQuery
+import pekko.persistence.query.TimestampOffset
import pekko.persistence.testkit.query.EventsByPersistenceIdSpec.Command
import pekko.persistence.testkit.query.EventsByPersistenceIdSpec.testBehavior
import pekko.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
@@ -71,6 +72,27 @@ class CurrentEventsBySlicesSpec
.futureValue
.map(_.event) should ===(Seq("evt-1", "evt-2", "evt-3", "evt-4",
"evt-5"))
}
+
+ "include tags in events by slices" in {
+ val probe = createTestProbe[Done]()
+ val ref1 = spawn(testBehavior("TagTest|pid-1"))
+ ref1 ! Command("tag-me-evt-1", probe.ref)
+ ref1 ! Command("evt-2", probe.ref)
+ probe.receiveMessages(2)
+ val ref2 = spawn(testBehavior("TagTest|pid-2"))
+ ref2 ! Command("evt-3", probe.ref)
+ ref2 ! Command("tag-me-evt-4", probe.ref)
+ probe.receiveMessages(2)
+
+ val result = queries
+ .currentEventsBySlices[String]("TagTest", 0,
Persistence(system).numberOfSlices - 1, NoOffset)
+ .runWith(Sink.seq)
+ .futureValue
+
+ result.head.offset shouldBe a[TimestampOffset]
+ result.map(e => (e.event, e.tags)) should ===(
+ Seq(("tag-me-evt-1", Set("tag")), ("evt-2", Set.empty), ("evt-3",
Set.empty), ("tag-me-evt-4", Set("tag"))))
+ }
}
}
diff --git
a/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/query/EventsByPersistenceIdSpec.scala
b/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/query/EventsByPersistenceIdSpec.scala
index 2865c65385..90f99321a3 100644
---
a/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/query/EventsByPersistenceIdSpec.scala
+++
b/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/query/EventsByPersistenceIdSpec.scala
@@ -20,6 +20,7 @@ import pekko.Done
import pekko.actor.testkit.typed.scaladsl.{ LogCapturing,
ScalaTestWithActorTestKit }
import pekko.actor.typed.ActorRef
import pekko.persistence.query.{ EventEnvelope, PersistenceQuery }
+import pekko.persistence.query.Sequence
import pekko.persistence.testkit.PersistenceTestKitPlugin
import pekko.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
import pekko.persistence.typed.PersistenceId
@@ -49,7 +50,7 @@ object EventsByPersistenceIdSpec {
Effect.persist(command.evt).thenRun { _ =>
command.ack ! Done
},
- (state, _) => state)
+ (state, _) => state).withTagger(evt => if (evt.startsWith("tag-me-"))
Set("tag") else Set.empty)
}
}
@@ -126,7 +127,9 @@ class EventsByPersistenceIdSpec
val probe = src.runWith(TestSink[EventEnvelope]())
probe.request(5)
- probe.expectNext().timestamp should be > 0L
+ val envelope = probe.expectNext()
+ envelope.timestamp should be > 0L
+ envelope.offset shouldBe a[Sequence]
probe.expectNext().timestamp should be > 0L
probe.cancel()
}
@@ -146,4 +149,20 @@ class EventsByPersistenceIdSpec
probe.cancel()
}
}
+
+ "Persistent test kit query currentEventsByPersistenceId" must {
+ "include timestamp in EventEnvelope" in {
+ setup("n")
+
+ val src = queries.currentEventsByPersistenceId("n", 0L, Long.MaxValue)
+ val probe = src.runWith(TestSink[EventEnvelope]())
+
+ probe.request(5)
+ val envelope = probe.expectNext()
+ envelope.timestamp should be > 0L
+ envelope.offset shouldBe a[Sequence]
+ probe.expectNext().timestamp should be > 0L
+ probe.cancel()
+ }
+ }
}
diff --git
a/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/query/EventsByPersistenceIdTypedSpec.scala
b/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/query/EventsByPersistenceIdTypedSpec.scala
new file mode 100644
index 0000000000..9a1b56f8ba
--- /dev/null
+++
b/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/query/EventsByPersistenceIdTypedSpec.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2020-2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.testkit.query
+
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.testkit.typed.scaladsl.LogCapturing
+import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
+import pekko.actor.typed.ActorRef
+import pekko.persistence.Persistence
+import pekko.persistence.query.PersistenceQuery
+import pekko.persistence.query.TimestampOffset
+import pekko.persistence.query.typed.EventEnvelope
+import pekko.persistence.testkit.PersistenceTestKitPlugin
+import pekko.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
+import pekko.persistence.typed.PersistenceId
+import pekko.persistence.typed.scaladsl.Effect
+import pekko.persistence.typed.scaladsl.EventSourcedBehavior
+import pekko.stream.scaladsl.Sink
+import pekko.stream.testkit.scaladsl.TestSink
+
+import org.scalatest.wordspec.AnyWordSpecLike
+
+import com.typesafe.config.ConfigFactory
+
+object EventsByPersistenceIdTypedSpec {
+ val config = PersistenceTestKitPlugin.config.withFallback(
+ ConfigFactory.parseString("""
+ pekko.loglevel = DEBUG
+ pekko.loggers = ["org.apache.pekko.testkit.SilenceAllTestEventListener"]
+ pekko.persistence.testkit.events.serialize = off
+ """))
+
+ case class Command(evt: String, ack: ActorRef[Done])
+ case class State()
+
+ def testBehavior(persistenceId: String) = {
+ EventSourcedBehavior[Command, String, State](
+ PersistenceId.ofUniqueId(persistenceId),
+ State(),
+ (_, command) =>
+ Effect.persist(command.evt).thenRun { _ =>
+ command.ack ! Done
+ },
+ (state, _) => state).withTagger(evt => if (evt.startsWith("tag-me-"))
Set("tag") else Set.empty)
+ }
+
+}
+
+class EventsByPersistenceIdTypedSpec
+ extends ScalaTestWithActorTestKit(EventsByPersistenceIdTypedSpec.config)
+ with LogCapturing
+ with AnyWordSpecLike {
+ import EventsByPersistenceIdTypedSpec._
+
+ implicit val classic: pekko.actor.ActorSystem = system.classicSystem
+
+ val queries =
+
PersistenceQuery(system).readJournalFor[PersistenceTestKitReadJournal](PersistenceTestKitReadJournal.Identifier)
+
+ def setup(persistenceId: String): ActorRef[Command] = {
+ val probe = createTestProbe[Done]()
+ val ref = setupEmpty(persistenceId)
+ ref ! Command(s"$persistenceId-1", probe.ref)
+ ref ! Command(s"$persistenceId-2", probe.ref)
+ ref ! Command(s"$persistenceId-3", probe.ref)
+ probe.expectMessage(Done)
+ probe.expectMessage(Done)
+ probe.expectMessage(Done)
+ ref
+ }
+
+ def setupEmpty(persistenceId: String): ActorRef[Command] = {
+ spawn(testBehavior(persistenceId))
+ }
+
+ "Persistent test kit live query eventsByPersistenceIdTyped" must {
+ "find new events" in {
+ val ackProbe = createTestProbe[Done]()
+ val ref = setup("d")
+ val src = queries.eventsByPersistenceIdTyped[String]("d", 0L,
Long.MaxValue)
+ val probe = src.runWith(TestSink[EventEnvelope[String]]())
+ probe.request(5)
+ probe.expectNextN(3)
+
+ ref ! Command("tag-me-d-4", ackProbe.ref)
+ ackProbe.expectMessage(Done)
+
+ val envelope = probe.expectNext()
+ envelope.offset shouldBe a[TimestampOffset]
+ envelope.event should ===("tag-me-d-4")
+ envelope.tags should ===(Set("tag"))
+ envelope.filtered should ===(false)
+ envelope.source should ===("")
+ envelope.slice should ===(Persistence(system).sliceForPersistenceId("d"))
+
+ val currentResult =
+ queries.currentEventsByPersistenceIdTyped[String]("d", 0L,
Long.MaxValue).runWith(Sink.seq).futureValue
+ currentResult should have size 4
+ currentResult.last should ===(envelope)
+ }
+ }
+}
diff --git
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala
index c19a16fa75..6e3ad66614 100644
---
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala
+++
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala
@@ -553,6 +553,28 @@ class EventSourcedBehaviorSpec
events shouldEqual List(EventEnvelope(Sequence(1), pid.id, 1,
Incremented(1), 0L))
}
+ "tag events based on state" in {
+ val pid = nextPid()
+ val c = spawn(
+ Behaviors.setup[Command](ctx =>
+ counter(ctx, pid).withTaggerForState((state, _) =>
+ if (state.value <= 1) Set.empty
+ else Set("higher-than-one"))))
+ val replyProbe = TestProbe[State]()
+
+ c ! Increment
+ c ! GetValue(replyProbe.ref)
+ replyProbe.expectMessage(State(1, Vector(0)))
+
+ c ! Increment
+ c ! GetValue(replyProbe.ref)
+ replyProbe.expectMessage(State(2, Vector(0, 1)))
+
+ val events = queries.currentEventsByTag("higher-than-one",
Offset.noOffset).runWith(Sink.seq).futureValue
+ events should have size 1
+ events.head shouldEqual EventEnvelope(Sequence(2), pid.id, 2,
Incremented(1), 0L)
+ }
+
"handle scheduled message arriving before recovery completed " in {
val c = spawn(Behaviors.withTimers[Command] { timers =>
timers.startSingleTimer(Increment, 1.millis)
diff --git
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorWatchSpec.scala
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorWatchSpec.scala
index ded56b0bf5..5556f79f12 100644
---
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorWatchSpec.scala
+++
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorWatchSpec.scala
@@ -76,7 +76,7 @@ class EventSourcedBehaviorWatchSpec
eventHandler = (state, evt) => state + evt,
WriterIdentity.newIdentity(),
pf,
- _ => Set.empty[String],
+ (_, _) => Set.empty[String],
NoOpEventAdapter.instance[String],
NoOpSnapshotAdapter.instance[String],
snapshotWhen = ConstantFun.scalaAnyThreeToFalse,
diff --git
a/persistence-typed/src/main/mima-filters/2.0.x.backwards.excludes/tags-for-state.excludes
b/persistence-typed/src/main/mima-filters/2.0.x.backwards.excludes/tags-for-state.excludes
new file mode 100644
index 0000000000..0537827a22
--- /dev/null
+++
b/persistence-typed/src/main/mima-filters/2.0.x.backwards.excludes/tags-for-state.excludes
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# internal
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.persistence.typed.internal.Running#HandlingCommands.adaptEvent")
+# Bin incompatible, but no way around it because Java DSL is based on
inheritance, hopefully nobody made
+# a method like this
+ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.persistence.typed.scaladsl.EventSourcedBehavior.withTaggerForState")
diff --git
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/BehaviorSetup.scala
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/BehaviorSetup.scala
index 38b94166a0..f8670c5bc1 100644
---
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/BehaviorSetup.scala
+++
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/BehaviorSetup.scala
@@ -61,7 +61,7 @@ private[pekko] final class BehaviorSetup[C, E, S](
val eventHandler: EventSourcedBehavior.EventHandler[S, E],
val writerIdentity: EventSourcedBehaviorImpl.WriterIdentity,
private val signalHandler: PartialFunction[(S, Signal), Unit],
- val tagger: E => Set[String],
+ val tagger: (S, E) => Set[String],
val eventAdapter: EventAdapter[E, Any],
val snapshotAdapter: SnapshotAdapter[S],
val snapshotWhen: (S, E, Long) => Boolean,
diff --git
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedBehaviorImpl.scala
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedBehaviorImpl.scala
index 75ebbb1b43..060e10f4b2 100644
---
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedBehaviorImpl.scala
+++
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedBehaviorImpl.scala
@@ -111,7 +111,7 @@ private[pekko] final case class
EventSourcedBehaviorImpl[Command, Event, State](
snapshotPluginId: Option[String] = None,
journalPluginConfig: Option[Config] = None,
snapshotPluginConfig: Option[Config] = None,
- tagger: Event => Set[String] = (_: Event) => Set.empty[String],
+ tagger: (State, Event) => Set[String] = (_: State, _: Event) =>
Set.empty[String],
eventAdapter: EventAdapter[Event, Any] = NoOpEventAdapter.instance[Event],
snapshotAdapter: SnapshotAdapter[State] =
NoOpSnapshotAdapter.instance[State],
snapshotWhen: (State, Event, Long) => Boolean =
ConstantFun.scalaAnyThreeToFalse,
@@ -285,6 +285,9 @@ private[pekko] final case class
EventSourcedBehaviorImpl[Command, Event, State](
copy(retention = criteria)
override def withTagger(tagger: Event => Set[String]):
EventSourcedBehavior[Command, Event, State] =
+ copy(tagger = (_, event) => tagger(event))
+
+ override def withTaggerForState(tagger: (State, Event) => Set[String]):
EventSourcedBehavior[Command, Event, State] =
copy(tagger = tagger)
override def eventAdapter(adapter: EventAdapter[Event, _]):
EventSourcedBehavior[Command, Event, State] =
diff --git
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala
index a43eb278b1..e2c88ee1ab 100644
---
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala
+++
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala
@@ -501,7 +501,7 @@ private[pekko] object Running {
replication.setContext(recoveryRunning = false, event.originReplica,
concurrent = isConcurrent)
val stateAfterApply = state.applyEvent(setup, event.event)
- val eventToPersist = adaptEvent(event.event)
+ val eventToPersist = adaptEvent(stateAfterApply.state, event.event)
val eventAdapterManifest = setup.eventAdapter.manifest(event.event)
replication.clearContext()
@@ -547,7 +547,7 @@ private[pekko] object Running {
setup.replication.foreach(r => r.setContext(recoveryRunning = false,
r.replicaId, concurrent = false))
val stateAfterApply = state.applyEvent(setup, event)
- val eventToPersist = adaptEvent(event)
+ val eventToPersist = adaptEvent(stateAfterApply.state, event)
val eventAdapterManifest = setup.eventAdapter.manifest(event)
val newState2: RunningState[S, C] = setup.replication match {
@@ -619,7 +619,6 @@ private[pekko] object Running {
if (shouldSnapshotAfterPersist == NoSnapshot)
shouldSnapshotAfterPersist =
setup.shouldSnapshot(currentState.state, event, _currentSequenceNumber)
val evtManifest = setup.eventAdapter.manifest(event)
- val adaptedEvent = adaptEvent(event)
val eventMetadata = metadataTemplate match {
case Some(template) =>
val updatedVersion =
currentState.version.updated(template.originReplica.id, _currentSequenceNumber)
@@ -635,6 +634,8 @@ private[pekko] object Running {
currentState = currentState.applyEvent(setup, event)
+ val adaptedEvent = adaptEvent(currentState.state, event)
+
eventsToPersist = EventToPersist(adaptedEvent, evtManifest,
eventMetadata) :: eventsToPersist
}
@@ -699,8 +700,8 @@ private[pekko] object Running {
}
}
- def adaptEvent(event: E): Any = {
- val tags = setup.tagger(event)
+ def adaptEvent(state: S, event: E): Any = {
+ val tags = setup.tagger(state, event)
val adaptedEvent = setup.eventAdapter.toJournal(event)
if (tags.isEmpty)
adaptedEvent
diff --git
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/javadsl/EventSourcedBehavior.scala
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/javadsl/EventSourcedBehavior.scala
index 742ccceb11..811ea8465e 100644
---
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/javadsl/EventSourcedBehavior.scala
+++
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/javadsl/EventSourcedBehavior.scala
@@ -178,10 +178,20 @@ abstract class EventSourcedBehavior[Command, Event,
State] private[pekko] (
def recovery: Recovery = Recovery.default
/**
- * The `tagger` function should give event tags, which will be used in
persistence query
+ * Return tags to store for the given event, the tags can then be used in
persistence query.
+ *
+ * If [[tagsFor(Event, State)]] is overridden this method is ignored.
*/
def tagsFor(@nowarn("msg=never used") event: Event): java.util.Set[String] =
Collections.emptySet()
+ /**
+ * Return tags to store for the given event and state, the tags can then be
used in persistence query.
+ * The state passed to the tagger allows for toggling a tag with one event
but keep all events after it tagged
+ * based on a property or the type of the state.
+ */
+ def tagsFor(@nowarn("msg=never used") state: State, event: Event):
java.util.Set[String] =
+ tagsFor(event)
+
/**
* Transform the event in another type before giving to the journal. Can be
used to wrap events
* in types Journals understand but is of a different type than `Event`.
@@ -207,9 +217,9 @@ abstract class EventSourcedBehavior[Command, Event, State]
private[pekko] (
: scaladsl.EventSourcedBehavior[Command, Event, State] = {
val snapshotWhen: (State, Event, Long) => Boolean = (state, event, seqNr)
=> shouldSnapshot(state, event, seqNr)
- val tagger: Event => Set[String] = { event =>
+ val tagger: (State, Event) => Set[String] = { (state, event) =>
import scala.jdk.CollectionConverters._
- val tags = tagsFor(event)
+ val tags = tagsFor(state, event)
if (tags.isEmpty) Set.empty
else tags.asScala.toSet
}
@@ -224,7 +234,7 @@ abstract class EventSourcedBehavior[Command, Event, State]
private[pekko] (
getClass)
.snapshotWhen(snapshotWhen)
.withRetention(retentionCriteria.asScala)
- .withTagger(tagger)
+ .withTaggerForState(tagger)
.eventAdapter(eventAdapter())
.snapshotAdapter(snapshotAdapter())
.withJournalPluginId(journalPluginId)
diff --git
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehavior.scala
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehavior.scala
index 692072a12d..23184348ff 100644
---
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehavior.scala
+++
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehavior.scala
@@ -204,6 +204,13 @@ object EventSourcedBehavior {
*/
def withTagger(tagger: Event => Set[String]): EventSourcedBehavior[Command,
Event, State]
+ /**
+ * The `tagger` function should give event tags, which will be used in
persistence query.
+ * The state passed to the tagger allows for toggling a tag with one event
but keep all events after it tagged
+ * based on a property or the type of the state.
+ */
+ def withTaggerForState(tagger: (State, Event) => Set[String]):
EventSourcedBehavior[Command, Event, State]
+
/**
* Transform the event to another type before giving to the journal. Can be
used to wrap events
* in types Journals understand but is of a different type than `Event`.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]