This is an automated email from the ASF dual-hosted git repository.
He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-persistence-jdbc.git
The following commit(s) were added to refs/heads/main by this push:
new 6df7e768 feat: optimized event tags table writes (#386)
6df7e768 is described below
commit 6df7e768a8dca7a1a6ad5c0f75fb439aa2be869e
Author: Andy(Jingzhang)Chen <[email protected]>
AuthorDate: Thu Apr 30 16:18:06 2026 +0800
feat: optimized event tags table writes (#386)
* feat: optimized event tags table writes
* docs: migration docs
* chore: fix mima filter issue
* chore: fmt issue
* chore: add apache header
* and new lines at EOF
* chore: reduce actor size
* fix: mysql, sql server tests issue
---------
Co-authored-by: PJ Fanning <[email protected]>
---
.../event-tags-insert-optimized.excludes | 28 +++
core/src/main/resources/reference.conf | 13 +-
.../main/resources/schema/h2/h2-create-schema.sql | 10 +-
.../schema/mariadb/mariadb-create-schema.sql | 10 +-
.../resources/schema/mysql/mysql-create-schema.sql | 17 +-
.../schema/mysql/mysql-event-tag-migration.sql | 40 +++
.../schema/oracle/oracle-create-schema.sql | 8 +-
.../schema/oracle/oracle-event-tag-migration.sql | 55 ++++
.../schema/postgres/postgres-create-schema.sql | 8 +-
.../postgres/postgresql-event-tag-migration.sql | 35 +++
.../schema/sqlserver/sqlserver-create-schema.sql | 17 +-
.../sqlserver/sqlserver-event-tag-migration.sql | 63 +++++
.../jdbc/config/PekkoPersistenceConfig.scala | 4 +
.../jdbc/journal/dao/JournalQueries.scala | 39 ++-
.../jdbc/journal/dao/JournalTables.scala | 15 +-
.../jdbc/query/dao/ReadJournalQueries.scala | 7 +-
core/src/test/resources/mysql-application.conf | 2 +-
.../mysql-explicit-select-application.conf | 2 +-
.../resources/mysql-shared-db-application.conf | 2 +-
...ysql-shared-db-explicit-select-application.conf | 2 +-
.../test/resources/oracle-schema-overrides.conf | 2 +
.../jdbc/query/CurrentEventsByTagTest.scala | 5 +-
.../jdbc/query/EventsByTagMigrationTest.scala | 278 +++++++++++++++++++++
docs/src/main/paradox/migration.md | 33 ++-
.../integration/EventsByTagMigrationTest.scala | 94 +++++++
.../jdbc/integration/MigrationScriptSpec.scala | 31 ++-
.../src/test/resources/mysql-application.conf | 2 +-
.../schema/h2/h2-create-schema-legacy.sql | 12 +
.../schema/mysql/mysql-create-schema-legacy.sql | 12 +
.../schema/oracle/oracle-create-schema-legacy.sql | 11 +
.../postgres/postgres-create-schema-legacy.sql | 12 +
.../sqlserver/sqlserver-create-schema-legacy.sql | 13 +
.../jdbc/migrator/JournalMigrator.scala | 7 +-
scripts/docker-compose.yml | 1 +
34 files changed, 825 insertions(+), 65 deletions(-)
diff --git
a/core/src/main/mima-filters/2.0.x.backwards.excludes/event-tags-insert-optimized.excludes
b/core/src/main/mima-filters/2.0.x.backwards.excludes/event-tags-insert-optimized.excludes
new file mode 100644
index 00000000..f272bc9d
--- /dev/null
+++
b/core/src/main/mima-filters/2.0.x.backwards.excludes/event-tags-insert-optimized.excludes
@@ -0,0 +1,28 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Change the signature of EventTags related.
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.persistence.jdbc.journal.dao.JournalTables#EventTags.eventId")
+ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.persistence.jdbc.journal.dao.JournalTables#TagRow.eventId")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.persistence.jdbc.journal.dao.JournalTables#TagRow.copy")
+ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.persistence.jdbc.journal.dao.JournalTables#TagRow.copy$default$1")
+ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.persistence.jdbc.journal.dao.JournalTables#TagRow.copy$default$2")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.persistence.jdbc.journal.dao.JournalTables#TagRow.this")
+ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.persistence.jdbc.journal.dao.JournalTables$TagRow$")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.persistence.jdbc.journal.dao.JournalTables#TagRow.apply")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.persistence.jdbc.journal.dao.JournalTables#TagRow.unapply")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.persistence.jdbc.journal.dao.JournalQueries.writeJournalRows")
diff --git a/core/src/main/resources/reference.conf
b/core/src/main/resources/reference.conf
index ba113166..6e7851e0 100644
--- a/core/src/main/resources/reference.conf
+++ b/core/src/main/resources/reference.conf
@@ -151,9 +151,16 @@ jdbc-journal {
schemaName = ""
columnNames {
+ # keep this column for the compatibility
eventId = "event_id"
+ persistenceId = "persistence_id"
+ sequenceNumber = "sequence_number"
tag = "tag"
}
+
+ # For rolling updates the event_tag table migration.
+ # switch those to enable new region key write and read.
+ legacy-tag-key = true
}
# Otherwise it would be a pinned dispatcher, see
https://github.com/akka/akka/issues/31058
@@ -450,7 +457,7 @@ jdbc-read-journal {
journal-sequence-retrieval {
# The maximum number of ids that will be retrieved in each batch
batch-size = 10000
- # In case a number in the sequence is missing, this is the ammount of
retries that will be done to see
+ # In case a number in the sequence is missing, this is the amount of
retries that will be done to see
# if the number is still found. Note that the time after which a number in
the sequence is assumed missing is
# equal to maxTries * queryDelay
# (maxTries may not be zero)
@@ -592,11 +599,11 @@ jdbc-durable-state-store {
}
}
- # Settings for determining if gloabal_offset column in the durable-state are
out of sequence.
+ # Settings for determining if global_offset column in the durable-state are
out of sequence.
durable-state-sequence-retrieval {
# The maximum number of ids that will be retrieved in each batch
batch-size = 10000
- # In case a number in the sequence is missing, this is the ammount of
retries that will be done to see
+ # In case a number in the sequence is missing, this is the amount of
retries that will be done to see
# if the number is still found. Note that the time after which a number in
the sequence is assumed missing is
# equal to maxTries * queryDelay
# (maxTries may not be zero)
diff --git a/core/src/main/resources/schema/h2/h2-create-schema.sql
b/core/src/main/resources/schema/h2/h2-create-schema.sql
index ee6d0c96..de030c54 100644
--- a/core/src/main/resources/schema/h2/h2-create-schema.sql
+++ b/core/src/main/resources/schema/h2/h2-create-schema.sql
@@ -18,12 +18,14 @@ CREATE TABLE IF NOT EXISTS PUBLIC."event_journal" (
CREATE UNIQUE INDEX "event_journal_ordering_idx" on PUBLIC."event_journal"
("ordering");
CREATE TABLE IF NOT EXISTS PUBLIC."event_tag" (
- "event_id" BIGINT NOT NULL,
+ "event_id" BIGINT,
+ "persistence_id" VARCHAR(255),
+ "sequence_number" BIGINT,
"tag" VARCHAR NOT NULL,
- PRIMARY KEY("event_id", "tag"),
+ PRIMARY KEY("persistence_id", "sequence_number", "tag"),
CONSTRAINT fk_event_journal
- FOREIGN KEY("event_id")
- REFERENCES "event_journal"("ordering")
+ FOREIGN KEY("persistence_id", "sequence_number")
+ REFERENCES "event_journal"("persistence_id", "sequence_number")
ON DELETE CASCADE
);
diff --git a/core/src/main/resources/schema/mariadb/mariadb-create-schema.sql
b/core/src/main/resources/schema/mariadb/mariadb-create-schema.sql
index 8f405920..7f2d20aa 100644
--- a/core/src/main/resources/schema/mariadb/mariadb-create-schema.sql
+++ b/core/src/main/resources/schema/mariadb/mariadb-create-schema.sql
@@ -20,11 +20,13 @@ CREATE UNIQUE INDEX event_journal_ordering_idx ON
event_journal (ordering);
CREATE TABLE IF NOT EXISTS event_tag
(
- event_id BIGINT UNSIGNED NOT NULL,
+ event_id BIGINT UNSIGNED,
+ persistence_id VARCHAR(255),
+ sequence_number BIGINT,
tag VARCHAR(255) NOT NULL,
- PRIMARY KEY (event_id, tag),
- FOREIGN KEY (event_id)
- REFERENCES event_journal (ordering)
+ PRIMARY KEY (persistence_id, sequence_number, tag),
+ FOREIGN KEY (persistence_id, sequence_number)
+ REFERENCES event_journal (persistence_id, sequence_number)
ON DELETE CASCADE
);
diff --git a/core/src/main/resources/schema/mysql/mysql-create-schema.sql
b/core/src/main/resources/schema/mysql/mysql-create-schema.sql
index 22da5edc..0e179597 100644
--- a/core/src/main/resources/schema/mysql/mysql-create-schema.sql
+++ b/core/src/main/resources/schema/mysql/mysql-create-schema.sql
@@ -17,13 +17,16 @@ CREATE TABLE IF NOT EXISTS event_journal(
CREATE UNIQUE INDEX event_journal_ordering_idx ON event_journal(ordering);
-CREATE TABLE IF NOT EXISTS event_tag (
- event_id BIGINT UNSIGNED NOT NULL,
- tag VARCHAR(255) NOT NULL,
- PRIMARY KEY(event_id, tag),
- FOREIGN KEY (event_id)
- REFERENCES event_journal(ordering)
- ON DELETE CASCADE
+CREATE TABLE IF NOT EXISTS event_tag
+(
+ event_id BIGINT UNSIGNED,
+ persistence_id VARCHAR(255),
+ sequence_number BIGINT,
+ tag VARCHAR(255) NOT NULL,
+ PRIMARY KEY (persistence_id, sequence_number, tag),
+ FOREIGN KEY (persistence_id, sequence_number)
+ REFERENCES event_journal (persistence_id, sequence_number)
+ ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS snapshot (
diff --git a/core/src/main/resources/schema/mysql/mysql-event-tag-migration.sql
b/core/src/main/resources/schema/mysql/mysql-event-tag-migration.sql
new file mode 100644
index 00000000..af68a0db
--- /dev/null
+++ b/core/src/main/resources/schema/mysql/mysql-event-tag-migration.sql
@@ -0,0 +1,40 @@
+-- **************** first step ****************
+-- add new column
+ALTER TABLE event_tag
+ ADD persistence_id VARCHAR(255),
+ ADD sequence_number BIGINT;
+
+
+-- **************** second step ****************
+-- migrate rows
+UPDATE event_tag
+ INNER JOIN event_journal
+ON event_tag.event_id = event_journal.ordering
+ SET event_tag.persistence_id = event_journal.persistence_id,
event_tag.sequence_number = event_journal.sequence_number;
+
+-- drop old FK constraint
+SELECT CONSTRAINT_NAME
+INTO @fk_constraint_name
+FROM INFORMATION_SCHEMA.REFERENTIAL_CONSTRAINTS
+WHERE TABLE_NAME = 'event_tag';
+
+SET @alter_query = CONCAT('ALTER TABLE event_tag DROP FOREIGN KEY ',
@fk_constraint_name);
+PREPARE stmt FROM @alter_query;
+EXECUTE stmt;
+DEALLOCATE PREPARE stmt;
+
+-- drop old PK constraint
+ALTER TABLE event_tag DROP PRIMARY KEY;
+-- create new PK constraint for PK column.
+ALTER TABLE event_tag
+ ADD CONSTRAINT
+ PRIMARY KEY (persistence_id, sequence_number, tag);
+-- create new FK constraint for PK column.
+ALTER TABLE event_tag
+ ADD CONSTRAINT fk_event_journal_on_pk
+ FOREIGN KEY (persistence_id, sequence_number)
+ REFERENCES event_journal (persistence_id, sequence_number)
+ ON DELETE CASCADE;
+-- alter the event_id to nullable, so we can skip the InsertAndReturn.
+ALTER TABLE event_tag
+ MODIFY COLUMN event_id BIGINT UNSIGNED NULL;
diff --git a/core/src/main/resources/schema/oracle/oracle-create-schema.sql
b/core/src/main/resources/schema/oracle/oracle-create-schema.sql
index 9ac5586f..f2951b07 100644
--- a/core/src/main/resources/schema/oracle/oracle-create-schema.sql
+++ b/core/src/main/resources/schema/oracle/oracle-create-schema.sql
@@ -23,10 +23,12 @@ CREATE OR REPLACE TRIGGER EVENT_JOURNAL__ORDERING_TRG
before insert on EVENT_JOU
/
CREATE TABLE EVENT_TAG (
- EVENT_ID NUMERIC NOT NULL,
+ EVENT_ID NUMERIC,
+ PERSISTENCE_ID VARCHAR(255),
+ SEQUENCE_NUMBER NUMERIC,
TAG VARCHAR(255) NOT NULL,
- PRIMARY KEY(EVENT_ID, TAG),
- FOREIGN KEY(EVENT_ID) REFERENCES EVENT_JOURNAL(ORDERING)
+ PRIMARY KEY(PERSISTENCE_ID, SEQUENCE_NUMBER, TAG),
+ FOREIGN KEY(PERSISTENCE_ID, SEQUENCE_NUMBER) REFERENCES
EVENT_JOURNAL(PERSISTENCE_ID, SEQUENCE_NUMBER)
ON DELETE CASCADE
)
/
diff --git
a/core/src/main/resources/schema/oracle/oracle-event-tag-migration.sql
b/core/src/main/resources/schema/oracle/oracle-event-tag-migration.sql
new file mode 100644
index 00000000..70f21ab3
--- /dev/null
+++ b/core/src/main/resources/schema/oracle/oracle-event-tag-migration.sql
@@ -0,0 +1,55 @@
+-- **************** first step ****************
+-- add new column
+ALTER TABLE EVENT_TAG
+ ADD (PERSISTENCE_ID VARCHAR2(255),
+ SEQUENCE_NUMBER NUMERIC);
+
+
+-- **************** second step ****************
+-- migrate rows
+UPDATE EVENT_TAG
+SET PERSISTENCE_ID = (SELECT PERSISTENCE_ID
+ FROM EVENT_JOURNAL
+ WHERE EVENT_TAG.EVENT_ID = EVENT_JOURNAL.ORDERING),
+ SEQUENCE_NUMBER = (SELECT SEQUENCE_NUMBER
+ FROM EVENT_JOURNAL
+ WHERE EVENT_TAG.EVENT_ID = EVENT_JOURNAL.ORDERING)
+
+-- drop old FK constraint
+DECLARE v_constraint_name VARCHAR2(255);
+BEGIN
+ SELECT CONSTRAINT_NAME
+ INTO v_constraint_name
+ FROM USER_CONSTRAINTS
+ WHERE TABLE_NAME = 'EVENT_TAG'
+ AND CONSTRAINT_TYPE = 'R';
+
+IF v_constraint_name IS NOT NULL THEN
+ EXECUTE IMMEDIATE 'ALTER TABLE EVENT_TAG DROP CONSTRAINT ' ||
v_constraint_name;
+END IF;
+
+COMMIT;
+EXCEPTION
+ WHEN OTHERS THEN
+ ROLLBACK;
+ RAISE;
+END;
+/
+
+-- drop old PK constraint
+ALTER TABLE EVENT_TAG DROP PRIMARY KEY;
+
+-- create new PK constraint for PK column.
+ALTER TABLE EVENT_TAG
+ ADD CONSTRAINT "pk_event_tag"
+ PRIMARY KEY (PERSISTENCE_ID, SEQUENCE_NUMBER, TAG);
+
+-- create new FK constraint for PK column.
+ALTER TABLE EVENT_TAG
+ ADD CONSTRAINT fk_EVENT_JOURNAL_on_pk
+ FOREIGN KEY (PERSISTENCE_ID, SEQUENCE_NUMBER)
+ REFERENCES EVENT_JOURNAL (PERSISTENCE_ID, SEQUENCE_NUMBER)
+ ON DELETE CASCADE;
+
+-- alter the EVENT_ID to nullable, so we can skip the InsertAndReturn.
+ALTER TABLE EVENT_TAG MODIFY EVENT_ID NULL;
diff --git a/core/src/main/resources/schema/postgres/postgres-create-schema.sql
b/core/src/main/resources/schema/postgres/postgres-create-schema.sql
index 7ae7e099..190dc966 100644
--- a/core/src/main/resources/schema/postgres/postgres-create-schema.sql
+++ b/core/src/main/resources/schema/postgres/postgres-create-schema.sql
@@ -23,11 +23,13 @@ CREATE UNIQUE INDEX event_journal_ordering_idx ON
public.event_journal(ordering)
CREATE TABLE IF NOT EXISTS public.event_tag(
event_id BIGINT,
+ persistence_id VARCHAR(255),
+ sequence_number BIGINT,
tag VARCHAR(256),
- PRIMARY KEY(event_id, tag),
+ PRIMARY KEY(persistence_id, sequence_number, tag),
CONSTRAINT fk_event_journal
- FOREIGN KEY(event_id)
- REFERENCES event_journal(ordering)
+ FOREIGN KEY(persistence_id, sequence_number)
+ REFERENCES event_journal(persistence_id, sequence_number)
ON DELETE CASCADE
);
diff --git
a/core/src/main/resources/schema/postgres/postgresql-event-tag-migration.sql
b/core/src/main/resources/schema/postgres/postgresql-event-tag-migration.sql
new file mode 100644
index 00000000..4c8e64bc
--- /dev/null
+++ b/core/src/main/resources/schema/postgres/postgresql-event-tag-migration.sql
@@ -0,0 +1,35 @@
+-- **************** first step ****************
+-- add new column
+ALTER TABLE public.event_tag
+ ADD persistence_id VARCHAR(255),
+ ADD sequence_number BIGINT;
+
+
+-- **************** second step ****************
+-- migrate rows
+UPDATE public.event_tag
+SET persistence_id = public.event_journal.persistence_id,
+ sequence_number = public.event_journal.sequence_number
+ FROM event_journal
+WHERE public.event_tag.event_id = public.event_journal.ordering;
+
+-- drop old FK constraint
+ALTER TABLE public.event_tag DROP CONSTRAINT "fk_event_journal";
+
+-- drop old PK constraint
+ALTER TABLE public.event_tag DROP CONSTRAINT "event_tag_pkey";
+
+-- create new PK constraint for PK column.
+ALTER TABLE public.event_tag
+ ADD CONSTRAINT "pk_event_tag"
+ PRIMARY KEY (persistence_id, sequence_number, tag);
+
+-- create new FK constraint for PK column.
+ALTER TABLE public.event_tag
+ ADD CONSTRAINT "fk_event_journal_on_pk"
+ FOREIGN KEY (persistence_id, sequence_number)
+ REFERENCES public.event_journal (persistence_id, sequence_number)
+ ON DELETE CASCADE;
+
+-- alter the event_id to nullable, so we can skip the InsertAndReturn.
+ALTER TABLE public.event_tag ALTER COLUMN event_id DROP NOT NULL;
diff --git
a/core/src/main/resources/schema/sqlserver/sqlserver-create-schema.sql
b/core/src/main/resources/schema/sqlserver/sqlserver-create-schema.sql
index 1116e03b..762b3bc4 100644
--- a/core/src/main/resources/schema/sqlserver/sqlserver-create-schema.sql
+++ b/core/src/main/resources/schema/sqlserver/sqlserver-create-schema.sql
@@ -17,14 +17,17 @@ CREATE TABLE event_journal(
CREATE UNIQUE INDEX event_journal_ordering_idx ON event_journal(ordering);
-CREATE TABLE event_tag (
- "event_id" BIGINT NOT NULL,
- "tag" NVARCHAR(255) NOT NULL
- PRIMARY KEY ("event_id","tag")
+CREATE TABLE event_tag
+(
+ "event_id" BIGINT,
+ "persistence_id" NVARCHAR(255),
+ "sequence_number" NUMERIC(10, 0),
+ "tag" NVARCHAR(255) NOT NULL
+ PRIMARY KEY ("persistence_id", "sequence_number","tag"),
constraint "fk_event_journal"
- foreign key("event_id")
- references "dbo"."event_journal"("ordering")
- on delete CASCADE
+ foreign key ("persistence_id", "sequence_number")
+ references "dbo"."event_journal" ("persistence_id",
"sequence_number")
+ on delete CASCADE
);
CREATE TABLE "snapshot" (
diff --git
a/core/src/main/resources/schema/sqlserver/sqlserver-event-tag-migration.sql
b/core/src/main/resources/schema/sqlserver/sqlserver-event-tag-migration.sql
new file mode 100644
index 00000000..4ee7bba1
--- /dev/null
+++ b/core/src/main/resources/schema/sqlserver/sqlserver-event-tag-migration.sql
@@ -0,0 +1,63 @@
+-- **************** first step ****************
+-- add new column
+ALTER TABLE event_tag
+ ADD persistence_id VARCHAR(255),
+ ADD sequence_number BIGINT;
+
+
+-- **************** second step ****************
+-- migrate rows
+UPDATE event_tag
+SET persistence_id = event_journal.persistence_id,
+ sequence_number = event_journal.sequence_number
+ FROM event_journal
+WHERE event_tag.event_id = event_journal.ordering;
+
+-- drop old FK constraint
+DECLARE @fkConstraintName NVARCHAR(MAX);
+DECLARE @dropFKConstraintQuery NVARCHAR(MAX);
+
+SELECT @fkConstraintName = CONSTRAINT_NAME
+FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS
+WHERE TABLE_NAME = 'event_tag'
+ AND CONSTRAINT_TYPE = 'FOREIGN KEY';
+
+IF @fkConstraintName IS NOT NULL
+BEGIN
+ SET @dropFKConstraintQuery = 'ALTER TABLE event_tag DROP CONSTRAINT '
+ QUOTENAME(@fkConstraintName);
+EXEC sp_executesql @dropFKConstraintQuery;
+END
+
+-- drop old PK constraint
+DECLARE @constraintName NVARCHAR(MAX);
+DECLARE @dropConstraintQuery NVARCHAR(MAX);
+
+SELECT @constraintName = CONSTRAINT_NAME
+FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS
+WHERE TABLE_NAME = 'event_tag'
+ AND CONSTRAINT_TYPE = 'PRIMARY KEY';
+
+IF @constraintName IS NOT NULL
+BEGIN
+ SET @dropConstraintQuery = 'ALTER TABLE event_tag DROP CONSTRAINT ' +
QUOTENAME(@constraintName);
+EXEC sp_executesql @dropConstraintQuery;
+END
+
+-- create new PK constraint for PK column.
+ALTER TABLE event_tag
+ALTER COLUMN persistence_id NVARCHAR(255) NOT NULL
+ALTER TABLE event_tag
+ALTER COLUMN sequence_number NUMERIC(10, 0) NOT NULL
+ALTER TABLE event_tag
+ ADD CONSTRAINT "pk_event_tag"
+ PRIMARY KEY (persistence_id, sequence_number, TAG);
+
+-- create new FK constraint for PK column.
+ALTER TABLE event_tag
+ ADD CONSTRAINT "fk_event_journal_on_pk"
+ FOREIGN KEY (persistence_id, sequence_number)
+ REFERENCES event_journal (persistence_id, sequence_number)
+ ON DELETE CASCADE;
+
+-- alter the event_id to nullable, so we can skip the InsertAndReturn.
+ALTER TABLE event_tag ALTER COLUMN event_id BIGINT NULL;
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/config/PekkoPersistenceConfig.scala
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/config/PekkoPersistenceConfig.scala
index 288a4e0f..f9672731 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/config/PekkoPersistenceConfig.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/config/PekkoPersistenceConfig.scala
@@ -62,7 +62,10 @@ class EventJournalTableColumnNames(config: Config) {
class EventTagTableColumnNames(config: Config) {
private val cfg = config.getConfig("tables.event_tag.columnNames")
+ // keep this column for the compatibility, will not be used
val eventId: String = cfg.getString("eventId")
+ val persistenceId: String = cfg.getString("persistenceId")
+ val sequenceNumber: String = cfg.getString("sequenceNumber")
val tag: String = cfg.getString("tag")
}
@@ -85,6 +88,7 @@ class EventTagTableConfiguration(config: Config) {
private val cfg = config.getConfig("tables.event_tag")
val tableName: String = cfg.getString("tableName")
val schemaName: Option[String] = cfg.asStringOption("schemaName")
+ val legacyTagKey: Boolean = cfg.getBoolean("legacy-tag-key")
val columnNames: EventTagTableColumnNames = new
EventTagTableColumnNames(config)
}
class LegacySnapshotTableColumnNames(config: Config) {
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/JournalQueries.scala
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/JournalQueries.scala
index 751e11c7..1236866d 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/JournalQueries.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/JournalQueries.scala
@@ -38,19 +38,32 @@ class JournalQueries(
val messagesQuery = Compiled(_messagesQuery _)
def writeJournalRows(xs: Seq[(JournalPekkoSerializationRow,
Set[String])])(implicit ec: ExecutionContext) = {
- val sorted = xs.sortBy(event => event._1.sequenceNumber)
- if (sorted.exists(_._2.nonEmpty)) {
- // only if there are any tags
- val (events, tags) = sorted.unzip
- for {
- ids <- insertAndReturn ++= events
- tagInserts = ids.zip(tags).flatMap { case (id, tags) => tags.map(tag
=> TagRow(id, tag)) }
- _ <- TagTableC ++= tagInserts
- } yield ()
- } else {
- // optimization avoid some work when not using tags
- val events = sorted.map(_._1)
- JournalTableC ++= events
+ val sorted = xs.sortBy(_._1.sequenceNumber)
+ val (events, tags) = sorted.unzip
+
+ def insertEvents(rows: Seq[JournalPekkoSerializationRow]):
DBIO[Option[Seq[Long]]] =
+ tagTableCfg.legacyTagKey match {
+ case true =>
+ (insertAndReturn ++= rows).map(e => Some(e))
+ case false =>
+ (JournalTableC ++= rows).map(_ => None)
+ }
+
+ insertEvents(events).flatMap {
+ // write optimized tags
+ case None if tags.exists(_.nonEmpty) =>
+ val tagInserts = sorted.map { case (e, tags) =>
+ tags.map(t => TagRow(None, Some(e.persistenceId),
Some(e.sequenceNumber), t))
+ }
+ TagTableC ++= tagInserts.flatten
+ // no tags, nothing more to do
+ case None => DBIO.successful(())
+ // legacy primary key
+ case Some(generatedIds) =>
+ val tagInserts = generatedIds.zip(sorted).flatMap { case (id, (e,
tags)) =>
+ tags.map(tag => TagRow(Some(id), Some(e.persistenceId),
Some(e.sequenceNumber), tag))
+ }
+ TagTableC ++= tagInserts
}
}
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/JournalTables.scala
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/JournalTables.scala
index 06dd6fe8..ea90d7bb 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/JournalTables.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/JournalTables.scala
@@ -43,7 +43,7 @@ object JournalTables {
def tupled = (apply _).tupled
}
- case class TagRow(eventId: Long, tag: String)
+ case class TagRow(eventId: Option[Long], persistenceId: Option[String],
sequenceNumber: Option[Long], tag: String)
object TagRow {
def tupled = (apply _).tupled
@@ -108,13 +108,18 @@ trait JournalTables {
lazy val JournalTable = new TableQuery(tag => new JournalEvents(tag))
class EventTags(_tableTag: Tag) extends Table[TagRow](_tableTag,
tagTableCfg.schemaName, tagTableCfg.tableName) {
- override def * = (eventId, tag).<>(TagRow.tupled, TagRow.unapply)
+ override def * = (eventId, persistenceId, sequenceNumber, tag) <>
(TagRow.tupled, TagRow.unapply)
- val eventId: Rep[Long] = column[Long](tagTableCfg.columnNames.eventId)
+ // allow null value insert.
+ val eventId: Rep[Option[Long]] =
column[Option[Long]](tagTableCfg.columnNames.eventId)
+ val persistenceId: Rep[Option[String]] =
column[Option[String]](tagTableCfg.columnNames.persistenceId)
+ val sequenceNumber: Rep[Option[Long]] =
column[Option[Long]](tagTableCfg.columnNames.sequenceNumber)
val tag: Rep[String] = column[String](tagTableCfg.columnNames.tag)
- val pk = primaryKey(s"${tagTableCfg.tableName}_pk", (eventId, tag))
- val journalEvent = foreignKey(s"fk_${journalTableCfg.tableName}", eventId,
JournalTable)(_.ordering)
+ val pk = primaryKey(s"${tagTableCfg.tableName}_pk", (persistenceId,
sequenceNumber, tag))
+ val journalEvent =
+ foreignKey(s"fk_${journalTableCfg.tableName}", (persistenceId,
sequenceNumber), JournalTable)(e =>
+ (Rep.Some(e.persistenceId), Rep.Some(e.sequenceNumber)))
}
lazy val TagTable = new TableQuery(tag => new EventTags(tag))
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/dao/ReadJournalQueries.scala
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/dao/ReadJournalQueries.scala
index cbe11d03..8b71d6e4 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/dao/ReadJournalQueries.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/dao/ReadJournalQueries.scala
@@ -40,8 +40,11 @@ class ReadJournalQueries(val profile: JdbcProfile, val
readJournalConfig: ReadJo
private def _allPersistenceIdsDistinct(max: ConstColumn[Long]):
Query[Rep[String], String, Seq] =
baseTableQuery().map(_.persistenceId).distinct.take(max)
- private def baseTableWithTagsQuery() = {
- baseTableQuery().join(TagTable).on(_.ordering === _.eventId)
+ private def baseTableWithTagsQuery() = tagTableCfg.legacyTagKey match {
+ case true => baseTableQuery().join(TagTable).on(_.ordering === _.eventId)
+ case false => baseTableQuery()
+ .join(TagTable)
+ .on((e, t) => e.persistenceId === t.persistenceId && e.sequenceNumber
=== t.sequenceNumber)
}
private def _lastPersistenceIdSequenceNumberQuery(persistenceId:
Rep[String]) =
diff --git a/core/src/test/resources/mysql-application.conf
b/core/src/test/resources/mysql-application.conf
index 81c4915b..14b3ce9c 100644
--- a/core/src/test/resources/mysql-application.conf
+++ b/core/src/test/resources/mysql-application.conf
@@ -54,7 +54,7 @@ slick {
db {
host = ${docker.host}
host = ${?DB_HOST}
- url =
"jdbc:mysql://"${slick.db.host}":3306/mysql?cachePrepStmts=true&cacheCallableStmts=true&cacheServerConfiguration=true&useLocalSessionState=true&elideSetAutoCommits=true&alwaysSendSetIsolation=false&enableQueryTimeouts=false&connectionAttributes=none&verifyServerCertificate=false&useSSL=false&allowPublicKeyRetrieval=true&useUnicode=true&useLegacyDatetimeCode=false&serverTimezone=UTC&rewriteBatchedStatements=true"
+ url =
"jdbc:mysql://"${slick.db.host}":3306/pekko_persistence_jdbc?cachePrepStmts=true&cacheCallableStmts=true&cacheServerConfiguration=true&useLocalSessionState=true&elideSetAutoCommits=true&alwaysSendSetIsolation=false&enableQueryTimeouts=false&connectionAttributes=none&verifyServerCertificate=false&useSSL=false&allowPublicKeyRetrieval=true&useUnicode=true&useLegacyDatetimeCode=false&serverTimezone=UTC&rewriteBatchedStatements=true"
user = "root"
password = "root"
driver = "com.mysql.cj.jdbc.Driver"
diff --git a/core/src/test/resources/mysql-explicit-select-application.conf
b/core/src/test/resources/mysql-explicit-select-application.conf
index 0e25837a..a019b458 100644
--- a/core/src/test/resources/mysql-explicit-select-application.conf
+++ b/core/src/test/resources/mysql-explicit-select-application.conf
@@ -60,7 +60,7 @@ slick {
db {
host = ${docker.host}
host = ${?DB_HOST}
- url =
"jdbc:mysql://"${slick.db.host}":3306/mysql?cachePrepStmts=true&cacheCallableStmts=true&cacheServerConfiguration=true&useLocalSessionState=true&elideSetAutoCommits=true&alwaysSendSetIsolation=false&enableQueryTimeouts=false&connectionAttributes=none&verifyServerCertificate=false&useSSL=false&allowPublicKeyRetrieval=true&useUnicode=true&useLegacyDatetimeCode=false&serverTimezone=UTC&rewriteBatchedStatements=true"
+ url =
"jdbc:mysql://"${slick.db.host}":3306/pekko_persistence_jdbc?cachePrepStmts=true&cacheCallableStmts=true&cacheServerConfiguration=true&useLocalSessionState=true&elideSetAutoCommits=true&alwaysSendSetIsolation=false&enableQueryTimeouts=false&connectionAttributes=none&verifyServerCertificate=false&useSSL=false&allowPublicKeyRetrieval=true&useUnicode=true&useLegacyDatetimeCode=false&serverTimezone=UTC&rewriteBatchedStatements=true"
user = "root"
password = "root"
driver = "com.mysql.cj.jdbc.Driver"
diff --git a/core/src/test/resources/mysql-shared-db-application.conf
b/core/src/test/resources/mysql-shared-db-application.conf
index e0327779..cab869c1 100644
--- a/core/src/test/resources/mysql-shared-db-application.conf
+++ b/core/src/test/resources/mysql-shared-db-application.conf
@@ -36,7 +36,7 @@ pekko-persistence-jdbc {
db {
host = ${docker.host}
host = ${?DB_HOST}
- url =
"jdbc:mysql://"${pekko-persistence-jdbc.shared-databases.slick.db.host}":3306/mysql?cachePrepStmts=true&cacheCallableStmts=true&cacheServerConfiguration=true&useLocalSessionState=true&elideSetAutoCommits=true&alwaysSendSetIsolation=false&enableQueryTimeouts=false&connectionAttributes=none&verifyServerCertificate=false&useSSL=false&allowPublicKeyRetrieval=true&useUnicode=true&useLegacyDatetimeCode=false&serverTimezone=UTC&rewriteBatchedStatements=true"
+ url =
"jdbc:mysql://"${pekko-persistence-jdbc.shared-databases.slick.db.host}":3306/pekko_persistence_jdbc?cachePrepStmts=true&cacheCallableStmts=true&cacheServerConfiguration=true&useLocalSessionState=true&elideSetAutoCommits=true&alwaysSendSetIsolation=false&enableQueryTimeouts=false&connectionAttributes=none&verifyServerCertificate=false&useSSL=false&allowPublicKeyRetrieval=true&useUnicode=true&useLegacyDatetimeCode=false&serverTimezone=UTC&rewriteBatchedStatements=true"
user = "root"
password = "root"
driver = "com.mysql.cj.jdbc.Driver"
diff --git
a/core/src/test/resources/mysql-shared-db-explicit-select-application.conf
b/core/src/test/resources/mysql-shared-db-explicit-select-application.conf
index b1248774..15b7a944 100644
--- a/core/src/test/resources/mysql-shared-db-explicit-select-application.conf
+++ b/core/src/test/resources/mysql-shared-db-explicit-select-application.conf
@@ -37,7 +37,7 @@ pekko-persistence-jdbc {
db {
host = ${docker.host}
host = ${?DB_HOST}
- url =
"jdbc:mysql://"${pekko-persistence-jdbc.shared-databases.slick.db.host}":3306/mysql?cachePrepStmts=true&cacheCallableStmts=true&cacheServerConfiguration=true&useLocalSessionState=true&elideSetAutoCommits=true&alwaysSendSetIsolation=false&enableQueryTimeouts=false&connectionAttributes=none&verifyServerCertificate=false&useSSL=false&allowPublicKeyRetrieval=true&useUnicode=true&useLegacyDatetimeCode=false&serverTimezone=UTC&rewriteBatchedStatements=true"
+ url =
"jdbc:mysql://"${pekko-persistence-jdbc.shared-databases.slick.db.host}":3306/pekko_persistence_jdbc?cachePrepStmts=true&cacheCallableStmts=true&cacheServerConfiguration=true&useLocalSessionState=true&elideSetAutoCommits=true&alwaysSendSetIsolation=false&enableQueryTimeouts=false&connectionAttributes=none&verifyServerCertificate=false&useSSL=false&allowPublicKeyRetrieval=true&useUnicode=true&useLegacyDatetimeCode=false&serverTimezone=UTC&rewriteBatchedStatements=true"
user = "root"
password = "root"
driver = "com.mysql.cj.jdbc.Driver"
diff --git a/core/src/test/resources/oracle-schema-overrides.conf
b/core/src/test/resources/oracle-schema-overrides.conf
index a064d710..ce94a93b 100644
--- a/core/src/test/resources/oracle-schema-overrides.conf
+++ b/core/src/test/resources/oracle-schema-overrides.conf
@@ -65,6 +65,8 @@ jdbc-journal {
columnNames {
eventId = "EVENT_ID"
+ persistenceId = "PERSISTENCE_ID"
+ sequenceNumber = "SEQUENCE_NUMBER"
tag = "TAG"
}
}
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/CurrentEventsByTagTest.scala
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/CurrentEventsByTagTest.scala
index a274ba22..74485f44 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/CurrentEventsByTagTest.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/CurrentEventsByTagTest.scala
@@ -196,9 +196,8 @@ abstract class CurrentEventsByTagTest(config: String)
extends QueryTestSpec(conf
// send a batch of 3 * 200
val batch1 = sendMessagesWithTag(tag, 200)
// Try to persist a large batch of events per actor. Some of these may
be returned, but not all!
- // Reduced as we can no longer do a batch insert due to the insert
returning the ordering
- // so trying to persist 1000s in a batch is slower
- val batch2 = sendMessagesWithTag(tag, 2000)
+ // Keep this below the test timeout: in legacy-tag-key mode writes
still depend on returning generated orderings.
+ val batch2 = sendMessagesWithTag(tag, 3000)
// wait for acknowledgement of the first batch only
batch1.futureValue
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/EventsByTagMigrationTest.scala
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/EventsByTagMigrationTest.scala
new file mode 100644
index 00000000..f06e4974
--- /dev/null
+++
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/EventsByTagMigrationTest.scala
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.persistence.jdbc.query
+
+import com.typesafe.config.{ ConfigFactory, ConfigValue, ConfigValueFactory }
+import org.apache.pekko.actor.ActorSystem
+import org.apache.pekko.pattern.ask
+import org.apache.pekko.persistence.jdbc.query.EventsByTagMigrationTest.{
+ legacyTagKeyConfigOverride,
+ migrationConfigOverride
+}
+import org.apache.pekko.persistence.query.{ EventEnvelope, Sequence }
+
+import scala.concurrent.duration._
+
+object EventsByTagMigrationTest {
+ val maxBufferSize = 20
+ val refreshInterval = 500.milliseconds
+ val legacyTagKey = true
+
+ val legacyTagKeyConfigOverride: Map[String, ConfigValue] = Map(
+ "jdbc-read-journal.max-buffer-size" ->
ConfigValueFactory.fromAnyRef(maxBufferSize.toString),
+ "jdbc-read-journal.refresh-interval" ->
ConfigValueFactory.fromAnyRef(refreshInterval.toString),
+ "jdbc-journal.tables.event_tag.legacy-tag-key" ->
ConfigValueFactory.fromAnyRef(legacyTagKey))
+
+ val migrationConfigOverride: Map[String, ConfigValue] = Map(
+ "jdbc-read-journal.max-buffer-size" ->
ConfigValueFactory.fromAnyRef(maxBufferSize.toString),
+ "jdbc-read-journal.refresh-interval" ->
ConfigValueFactory.fromAnyRef(refreshInterval.toString))
+}
+
+abstract class EventsByTagMigrationTest(config: String) extends
QueryTestSpec(config, migrationConfigOverride) {
+ final val NoMsgTime: FiniteDuration = 100.millis
+
+ val tagTableCfg = journalConfig.eventTagTableConfiguration
+ val journalTableCfg = journalConfig.eventJournalTableConfiguration
+ val joinSQL: String =
+ s"JOIN ${journalTableName} ON
${tagTableCfg.tableName}.${tagTableCfg.columnNames.eventId} =
${journalTableName}.${journalTableCfg.columnNames.ordering}"
+ val fromSQL: String =
+ s"FROM ${journalTableName} WHERE
${tagTableCfg.tableName}.${tagTableCfg.columnNames.eventId} =
${journalTableName}.${journalTableCfg.columnNames.ordering}"
+
+ def dropConstraint(
+ tableName: String = tagTableCfg.tableName,
+ constraintTableName: String = "INFORMATION_SCHEMA.TABLE_CONSTRAINTS",
+ constraintType: String,
+ constraintDialect: String = "CONSTRAINT",
+ constraintNameDialect: String = ""): Unit = {
+ withStatement { stmt =>
+ // SELECT AND DROP old CONSTRAINT
+ val constraintNameQuery =
+ s"""
+ |SELECT CONSTRAINT_NAME
+ |FROM $constraintTableName
+ |WHERE TABLE_NAME = '$tableName' AND CONSTRAINT_TYPE =
'$constraintType'
+ """.stripMargin
+ val resultSet = stmt.executeQuery(constraintNameQuery)
+ if (resultSet.next()) {
+ val constraintName = resultSet.getString("CONSTRAINT_NAME")
+ stmt.execute(s"ALTER TABLE $tableName DROP $constraintDialect
$constraintName $constraintNameDialect")
+ }
+ }
+ }
+
+ def addPKConstraint(
+ tableName: String = tagTableCfg.tableName,
+ pidColumnName: String = tagTableCfg.columnNames.persistenceId,
+ seqNrColumnName: String = tagTableCfg.columnNames.sequenceNumber,
+ tagColumnName: String = tagTableCfg.columnNames.tag,
+ constraintNameDialect: String = "pk_event_tag"): Unit = {
+ withStatement { stmt =>
+ stmt.execute(s"""
+ |ALTER TABLE $tableName
+ |ADD CONSTRAINT $constraintNameDialect
+ |PRIMARY KEY ($pidColumnName, $seqNrColumnName,
$tagColumnName)
+ """.stripMargin)
+ }
+ }
+
+ def addFKConstraint(
+ tableName: String = tagTableCfg.tableName,
+ pidColumnName: String = tagTableCfg.columnNames.persistenceId,
+ seqNrColumnName: String = tagTableCfg.columnNames.sequenceNumber,
+ journalTableName: String = journalTableCfg.tableName,
+ journalPidColumnName: String = tagTableCfg.columnNames.persistenceId,
+ journalSeqNrColumnName: String = tagTableCfg.columnNames.sequenceNumber,
+ constraintNameDialect: String = "fk_event_journal_on_pk"): Unit = {
+ withStatement { stmt =>
+ stmt.execute(s"""
+ |ALTER TABLE $tableName
+ |ADD CONSTRAINT $constraintNameDialect
+ |FOREIGN KEY ($pidColumnName, $seqNrColumnName)
+ |REFERENCES $journalTableName ($journalPidColumnName,
$journalSeqNrColumnName)
+ |ON DELETE CASCADE
+ """.stripMargin)
+ }
+ }
+
+ def alterColumn(
+ tableName: String = tagTableCfg.tableName,
+ alterDialect: String = "ALTER COLUMN",
+ columnName: String = tagTableCfg.columnNames.eventId,
+ changeToDialect: String = "BIGINT NULL"): Unit = {
+ withStatement { stmt =>
+ stmt.execute(s"ALTER TABLE $tableName $alterDialect $columnName
$changeToDialect")
+ }
+ }
+
+ def fillNewColumn(
+ joinDialect: String = "",
+ pidSetDialect: String =
+ s"${tagTableCfg.columnNames.persistenceId} =
${journalTableName}.${journalTableCfg.columnNames.persistenceId}",
+ seqNrSetDialect: String =
+ s"${tagTableCfg.columnNames.sequenceNumber} =
${journalTableName}.${journalTableCfg.columnNames.sequenceNumber}",
+ fromDialect: String = ""): Unit = {
+ withStatement { stmt =>
+ stmt.execute(s"""
+ |UPDATE ${tagTableCfg.tableName} ${joinDialect}
+ |SET ${pidSetDialect},
+ |${seqNrSetDialect}
+ |${fromDialect}""".stripMargin)
+ }
+ }
+
+ /**
+ * add new column to event_tag table.
+ */
+ def addNewColumn(): Unit = {}
+
+ /**
+ * fill new column for exists rows.
+ */
+ def migrateLegacyRows(): Unit = {
+ fillNewColumn(fromDialect = fromSQL);
+ }
+
+ /**
+ * drop old FK constraint
+ */
+ def dropLegacyFKConstraint(): Unit =
+ dropConstraint(constraintType = "FOREIGN KEY")
+
+ /**
+ * drop old PK constraint
+ */
+ def dropLegacyPKConstraint(): Unit =
+ dropConstraint(constraintType = "PRIMARY KEY")
+
+ /**
+ * create new PK constraint for PK column.
+ */
+ def addNewPKConstraint(): Unit =
+ addPKConstraint()
+
+ /**
+ * create new FK constraint for PK column.
+ */
+ def addNewFKConstraint(): Unit =
+ addFKConstraint()
+
+ // override this, so we can reset the value.
+ def withRollingUpdateActorSystem(f: ActorSystem => Unit): Unit = {
+ val legacyTagKeyConfig =
legacyTagKeyConfigOverride.foldLeft(ConfigFactory.load(config)) {
+ case (conf, (path, configValue)) =>
+ conf.withValue(path, configValue)
+ }
+
+ implicit val system: ActorSystem = ActorSystem("migrator-test",
legacyTagKeyConfig)
+ f(system)
+ system.terminate().futureValue
+ }
+
+ it should "migrate event tag to new way" in {
+ // 1. Mock legacy tag column on here, but actually using new tag write.
+ withRollingUpdateActorSystem { implicit system =>
+ val journalOps = new ScalaJdbcReadJournalOperations(system)
+ withTestActors(replyToMessages = true) { (actor1, actor2, actor3) =>
+ (actor1 ? withTags(1, "number")).futureValue
+ (actor2 ? withTags(2, "number")).futureValue
+ (actor3 ? withTags(3, "number")).futureValue
+
+ journalOps.withEventsByTag()("number", Sequence(Long.MinValue)) { tp =>
+ tp.request(Int.MaxValue)
+ tp.expectNext(EventEnvelope(Sequence(1), "my-1", 1, 1, timestamp =
0L))
+ tp.expectNext(EventEnvelope(Sequence(2), "my-2", 1, 2, timestamp =
0L))
+ tp.expectNext(EventEnvelope(Sequence(3), "my-3", 1, 3, timestamp =
0L))
+ tp.cancel()
+ }
+ }(system)
+ }
+
+ // Assume that the user could alter table for the addition of the new
column manually, then we don't need to maintain
+ // the legacy table schema creation.
+ if (newDao) {
+ addNewColumn();
+ migrateLegacyRows();
+ }
+
+ // 2. write and read redundancy
+ withRollingUpdateActorSystem { implicit system =>
+ val journalOps = new ScalaJdbcReadJournalOperations(system)
+ withTestActors(replyToMessages = true) { (actor1, actor2, actor3) =>
+ (actor1 ? withTags(4, "number")).futureValue
+ (actor2 ? withTags(5, "number")).futureValue
+ (actor3 ? withTags(6, "number")).futureValue
+ // Delay events that have not yet been projected can still be read.
+ journalOps.withEventsByTag()("number", Sequence(Long.MinValue)) { tp =>
+ tp.request(Int.MaxValue)
+ tp.expectNext(EventEnvelope(Sequence(1), "my-1", 1, 1, timestamp =
0L))
+ tp.expectNext(EventEnvelope(Sequence(2), "my-2", 1, 2, timestamp =
0L))
+ tp.expectNext(EventEnvelope(Sequence(3), "my-3", 1, 3, timestamp =
0L))
+ tp.expectNext(EventEnvelope(Sequence(4), "my-1", 2, 4, timestamp =
0L))
+ tp.expectNext(EventEnvelope(Sequence(5), "my-2", 2, 5, timestamp =
0L))
+ tp.expectNext(EventEnvelope(Sequence(6), "my-3", 2, 6, timestamp =
0L))
+ tp.cancel()
+ }
+ }(system)
+ }
+
+ // 3. Migrate the old constraints so that we can change read and write
from the new PK.
+ if (newDao) {
+ dropLegacyFKConstraint();
+ dropLegacyPKConstraint()
+ addNewPKConstraint()
+ addNewFKConstraint()
+ }
+
+ // 4. check the migration completed.
+ withActorSystem { implicit system =>
+ val journalOps = new ScalaJdbcReadJournalOperations(system)
+ withTestActors(replyToMessages = true) { (actor1, actor2, actor3) =>
+ (actor1 ? withTags(7, "number")).futureValue
+ (actor2 ? withTags(8, "number")).futureValue
+ (actor3 ? withTags(9, "number")).futureValue
+
+ journalOps.withEventsByTag()("number", Sequence(3)) { tp =>
+ tp.request(Int.MaxValue)
+ tp.expectNext(EventEnvelope(Sequence(4), "my-1", 2, 4, timestamp =
0L))
+ tp.expectNext(EventEnvelope(Sequence(5), "my-2", 2, 5, timestamp =
0L))
+ tp.expectNext(EventEnvelope(Sequence(6), "my-3", 2, 6, timestamp =
0L))
+ tp.expectNext(EventEnvelope(Sequence(7), "my-1", 3, 7, timestamp =
0L))
+ tp.expectNext(EventEnvelope(Sequence(8), "my-2", 3, 8, timestamp =
0L))
+ tp.expectNext(EventEnvelope(Sequence(9), "my-3", 3, 9, timestamp =
0L))
+ tp.cancel()
+ }
+
+ }(system)
+ }
+ }
+}
+
+class H2ScalaEventsByTagMigrationTest extends
EventsByTagMigrationTest("h2-application.conf") with H2Cleaner {
+
+ override def migrateLegacyRows(): Unit = {
+ fillNewColumn(
+ pidSetDialect = s"""${tagTableCfg.columnNames.persistenceId} = (
+ | SELECT
${journalTableCfg.columnNames.persistenceId}
+ | ${fromSQL}
+ |)""".stripMargin,
+ seqNrSetDialect = s"""${tagTableCfg.columnNames.sequenceNumber} = (
+ | SELECT
${journalTableCfg.columnNames.sequenceNumber}
+ | ${fromSQL}
+ |)""".stripMargin)
+ }
+}
diff --git a/docs/src/main/paradox/migration.md
b/docs/src/main/paradox/migration.md
index 300d0d9e..50c5942f 100644
--- a/docs/src/main/paradox/migration.md
+++ b/docs/src/main/paradox/migration.md
@@ -1,9 +1,38 @@
-# Migration from Akka Persistence JDBC to Pekko Persistence JDBC 1.0.x/1.1.x
+# Migration
* If you are looking to migrate from [Akka Persistence
JDBC](https://doc.akka.io/docs/akka-persistence-jdbc/current/migration.html),
you should upgrade to 5.1.x before attempting to migrate to Pekko's equivalent.
* If you are using a newer version of Akka Persistence JDBC, it might be best
to compare your table definitions with the Pekko table definitions of the
version of Pekko Persistence JDBC that you intend to migrate to. It is possible
that Akka have added changes that are not compatible with Pekko supports.
* The [Pekko Migration
Guide](https://pekko.apache.org/docs/pekko/1.0/project/migration-guides.html)
is a good summary of the changes that you need to make when switching from Akka
to Pekko.
-# Migrating to 1.2.x
+## Migrating to 1.2.x
It is recommended that you read the section about DB Schema Changes in the
@ref[1.2.0 Release Notes](release-notes/releases-1.2.md).
+
+## Migrating to version 2.0.0
+
+Release `2.0.0` changes the schema of the `event_tag` table.
+
+The previous version was using an auto-increment column as a primary key and
foreign key on the `event_tag` table. As a result, the insert of multiple
events in batch was not performant.
+
+While in `2.0.0`, the primary key and foreign key on the `event_tag` table
have been replaced with a primary key from the `event_journal` table. In order
to migrate to the new schema, we made a [**migration
script**](https://github.com/apache/pekko-persistence-jdbc/tree/master/core/src/main/resources/schema)
which is capable of creating the new column, migrating the rows and adding the
new constraints.
+
+By default, the plugin will behave as in previous version. If you want to use
the new `event_tag` keys, you need to run a multiple-phase rollout:
+
+1. apply the first part of the migration script and then redeploy your
application with the default settings.
+2. apply the second part of the migration script that will migrate the rows
and adapt the constraints.
+3. redeploy the application by disabling the legacy-mode:
+
+```config
+jdbc-journal {
+ tables {
+ // ...
+ event_tag {
+ // ...
+ // enable the new tag key
+ legacy-tag-key = false
+ }
+ }
+}
+// or simply configure via flattened style
+jdbc-journal.tables.event_tag.legacy-tag-key = false
+```
diff --git
a/integration-test/src/test/scala/org/apache/pekko/persistence/jdbc/integration/EventsByTagMigrationTest.scala
b/integration-test/src/test/scala/org/apache/pekko/persistence/jdbc/integration/EventsByTagMigrationTest.scala
new file mode 100644
index 00000000..b58972d0
--- /dev/null
+++
b/integration-test/src/test/scala/org/apache/pekko/persistence/jdbc/integration/EventsByTagMigrationTest.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.persistence.jdbc.integration
+
+import org.apache.pekko.persistence.jdbc.query.{
+ EventsByTagMigrationTest,
+ MysqlCleaner,
+ OracleCleaner,
+ PostgresCleaner,
+ SqlServerCleaner
+}
+
+class PostgresScalaEventsByTagMigrationTest
+ extends EventsByTagMigrationTest("postgres-application.conf")
+ with PostgresCleaner {}
+
+class MySQLScalaEventByTagMigrationTest extends
EventsByTagMigrationTest("mysql-application.conf") with MysqlCleaner {
+
+ override def dropLegacyFKConstraint(): Unit =
+ dropConstraint(constraintType = "FOREIGN KEY", constraintDialect =
"FOREIGN KEY")
+
+ override def dropLegacyPKConstraint(): Unit =
+ dropConstraint(constraintType = "PRIMARY KEY", constraintDialect = "",
constraintNameDialect = "KEY")
+
+ override def addNewPKConstraint(): Unit =
+ addPKConstraint(constraintNameDialect = "")
+
+ override def addNewFKConstraint(): Unit =
+ addFKConstraint()
+
+ override def migrateLegacyRows(): Unit =
+ fillNewColumn(
+ joinDialect = joinSQL,
+ pidSetDialect =
+ s"${tagTableCfg.tableName}.${tagTableCfg.columnNames.persistenceId} =
${journalTableName}.${journalTableCfg.columnNames.persistenceId}",
+ seqNrSetDialect =
+ s"${tagTableCfg.tableName}.${tagTableCfg.columnNames.sequenceNumber} =
${journalTableName}.${journalTableCfg.columnNames.sequenceNumber}")
+}
+
+class OracleScalaEventByTagMigrationTest
+ extends EventsByTagMigrationTest("oracle-application.conf")
+ with OracleCleaner {
+
+ override def addNewColumn(): Unit = {
+ // mock event_id not null, in order to change it to null later
+ alterColumn(alterDialect = "MODIFY", changeToDialect = "NOT NULL")
+ }
+
+ override def dropLegacyFKConstraint(): Unit =
+ dropConstraint(constraintTableName = "USER_CONSTRAINTS", constraintType =
"R")
+
+ override def dropLegacyPKConstraint(): Unit =
+ dropConstraint(constraintTableName = "USER_CONSTRAINTS", constraintType =
"P")
+
+ override def migrateLegacyRows(): Unit =
+ withStatement { stmt =>
+ stmt.execute(s"""UPDATE ${tagTableCfg.tableName}
+ |SET (${tagTableCfg.columnNames.persistenceId},
${tagTableCfg.columnNames.sequenceNumber}) = (
+ | SELECT
${journalTableCfg.columnNames.persistenceId},
${journalTableCfg.columnNames.sequenceNumber}
+ | ${fromSQL}
+ |)
+ |WHERE EXISTS (
+ | SELECT 1
+ | ${fromSQL}
+ |)""".stripMargin)
+ }
+}
+
+class SqlServerScalaEventByTagMigrationTest
+ extends EventsByTagMigrationTest("sqlserver-application.conf")
+ with SqlServerCleaner {
+
+ override def addNewPKConstraint(): Unit = {
+ // Change new column not null
+ alterColumn(columnName = tagTableCfg.columnNames.persistenceId,
changeToDialect = "NVARCHAR(255) NOT NULL")
+ alterColumn(columnName = tagTableCfg.columnNames.sequenceNumber,
changeToDialect = "NUMERIC(10,0) NOT NULL")
+ super.addNewPKConstraint()
+ }
+}
diff --git
a/integration-test/src/test/scala/org/apache/pekko/persistence/jdbc/integration/MigrationScriptSpec.scala
b/integration-test/src/test/scala/org/apache/pekko/persistence/jdbc/integration/MigrationScriptSpec.scala
index 2687d28b..f69e3517 100644
---
a/integration-test/src/test/scala/org/apache/pekko/persistence/jdbc/integration/MigrationScriptSpec.scala
+++
b/integration-test/src/test/scala/org/apache/pekko/persistence/jdbc/integration/MigrationScriptSpec.scala
@@ -23,7 +23,7 @@ import org.apache.pekko
import pekko.Done
import pekko.actor.ActorSystem
import pekko.persistence.jdbc.state.scaladsl.StateSpecBase
-import pekko.persistence.jdbc.testkit.internal.{ SchemaType, SqlServer }
+import pekko.persistence.jdbc.testkit.internal.{ MariaDB, MySQL, SchemaType,
SqlServer }
import slick.jdbc.JdbcBackend.Database
import scala.util.Using
@@ -61,8 +61,28 @@ class SqlServerMigrationScriptSpec extends
MigrationScriptSpec(
ConfigFactory.load("sqlserver-application.conf"),
SqlServer
) {
+
+ private def recreateEventTagBeforeTagMigration(): Unit =
+ withStatement(db) { stmt =>
+ stmt.executeUpdate("DROP TABLE IF EXISTS \"event_tag\"")
+ stmt.executeUpdate("""
+ CREATE TABLE "event_tag"
+ (
+ "event_id" BIGINT NOT NULL,
+ "tag" NVARCHAR(255) NOT NULL,
+ PRIMARY KEY ("event_id", "tag"),
+ CONSTRAINT "fk_event_journal"
+ FOREIGN KEY ("event_id")
+ REFERENCES "dbo"."event_journal" ("ordering")
+ ON DELETE CASCADE
+ )
+ """)
+ }
+
"SQL Server nvarchar migration script" must {
"apply without errors" in {
+ recreateEventTagBeforeTagMigration()
+
val scriptPath =
getClass.getResource("/schema/sqlserver/migration-1.2.0/sqlserver-nvarchar-migration.sql").getPath
val sql = Using(scala.io.Source.fromFile(scriptPath))(_.mkString).get
@@ -78,10 +98,12 @@ class SqlServerMigrationScriptSpec extends
MigrationScriptSpec(
class MariaDBMigrationScriptSpec extends MigrationScriptSpec(
ConfigFactory.load("mariadb-application.conf"),
- SqlServer
+ MariaDB
) {
"MariaDB migration script" must {
"apply the schema and the migration without errors" in {
+ drop(MariaDB)
+
val schemaPath =
getClass.getResource("/schema/mariadb/mariadb-create-schema.sql").getPath
val schema = Using(scala.io.Source.fromFile(schemaPath))(_.mkString).get
applyScriptWithSlick(schema, db)
@@ -96,10 +118,13 @@ class MariaDBMigrationScriptSpec extends
MigrationScriptSpec(
class MySQLMigrationScriptSpec extends MigrationScriptSpec(
ConfigFactory.load("mysql-application.conf"),
- SqlServer
+ MySQL
) {
"MySQL migration script" must {
"apply the schema and the migration without errors" in {
+ drop(MySQL)
+ applyScriptWithSlick("DROP TABLE IF EXISTS journal", db)
+
val schemaPath =
getClass.getResource("/schema/mysql/mysql-create-schema-legacy.sql").getPath
val schema = Using(scala.io.Source.fromFile(schemaPath))(_.mkString).get
diff --git
a/migrator-integration-test/src/test/resources/mysql-application.conf
b/migrator-integration-test/src/test/resources/mysql-application.conf
index 5082bc7d..2929fbf4 100644
--- a/migrator-integration-test/src/test/resources/mysql-application.conf
+++ b/migrator-integration-test/src/test/resources/mysql-application.conf
@@ -39,7 +39,7 @@ slick {
db {
host = ${docker.host}
host = ${?DB_HOST}
- url =
"jdbc:mysql://"${slick.db.host}":3306/mysql?cachePrepStmts=true&cacheCallableStmts=true&cacheServerConfiguration=true&useLocalSessionState=true&elideSetAutoCommits=true&alwaysSendSetIsolation=false&enableQueryTimeouts=false&connectionAttributes=none&verifyServerCertificate=false&useSSL=false&allowPublicKeyRetrieval=true&useUnicode=true&useLegacyDatetimeCode=false&serverTimezone=UTC&rewriteBatchedStatements=true"
+ url =
"jdbc:mysql://"${slick.db.host}":3306/pekko_persistence_jdbc?cachePrepStmts=true&cacheCallableStmts=true&cacheServerConfiguration=true&useLocalSessionState=true&elideSetAutoCommits=true&alwaysSendSetIsolation=false&enableQueryTimeouts=false&connectionAttributes=none&verifyServerCertificate=false&useSSL=false&allowPublicKeyRetrieval=true&useUnicode=true&useLegacyDatetimeCode=false&serverTimezone=UTC&rewriteBatchedStatements=true"
user = "root"
password = "root"
driver = "com.mysql.cj.jdbc.Driver"
diff --git
a/migrator-integration-test/src/test/resources/schema/h2/h2-create-schema-legacy.sql
b/migrator-integration-test/src/test/resources/schema/h2/h2-create-schema-legacy.sql
index 2a82b090..e3b4d5fb 100644
---
a/migrator-integration-test/src/test/resources/schema/h2/h2-create-schema-legacy.sql
+++
b/migrator-integration-test/src/test/resources/schema/h2/h2-create-schema-legacy.sql
@@ -9,6 +9,18 @@ CREATE TABLE IF NOT EXISTS PUBLIC."journal" (
);
CREATE UNIQUE INDEX IF NOT EXISTS "journal_ordering_idx" ON
PUBLIC."journal"("ordering");
+CREATE TABLE IF NOT EXISTS PUBLIC."event_tag" (
+ "event_id" BIGINT,
+ "persistence_id" VARCHAR(255),
+ "sequence_number" BIGINT,
+ "tag" VARCHAR NOT NULL,
+ PRIMARY KEY("persistence_id", "sequence_number", "tag"),
+ CONSTRAINT fk_event_journal
+ FOREIGN KEY("persistence_id", "sequence_number")
+ REFERENCES "event_journal"("persistence_id", "sequence_number")
+ ON DELETE CASCADE
+);
+
CREATE TABLE IF NOT EXISTS PUBLIC."legacy_snapshot" (
"persistence_id" VARCHAR(255) NOT NULL,
"sequence_number" BIGINT NOT NULL,
diff --git
a/migrator-integration-test/src/test/resources/schema/mysql/mysql-create-schema-legacy.sql
b/migrator-integration-test/src/test/resources/schema/mysql/mysql-create-schema-legacy.sql
index 841e6556..1c4daee9 100644
---
a/migrator-integration-test/src/test/resources/schema/mysql/mysql-create-schema-legacy.sql
+++
b/migrator-integration-test/src/test/resources/schema/mysql/mysql-create-schema-legacy.sql
@@ -16,3 +16,15 @@ CREATE TABLE IF NOT EXISTS legacy_snapshot (
snapshot BLOB NOT NULL,
PRIMARY KEY (persistence_id, sequence_number)
);
+
+CREATE TABLE IF NOT EXISTS event_tag
+(
+ event_id BIGINT UNSIGNED,
+ persistence_id VARCHAR(255),
+ sequence_number BIGINT,
+ tag VARCHAR(255) NOT NULL,
+ PRIMARY KEY (persistence_id, sequence_number, tag),
+ FOREIGN KEY (persistence_id, sequence_number)
+ REFERENCES event_journal (persistence_id, sequence_number)
+ ON DELETE CASCADE
+ );
diff --git
a/migrator-integration-test/src/test/resources/schema/oracle/oracle-create-schema-legacy.sql
b/migrator-integration-test/src/test/resources/schema/oracle/oracle-create-schema-legacy.sql
index 81efa5aa..bda5f9b6 100644
---
a/migrator-integration-test/src/test/resources/schema/oracle/oracle-create-schema-legacy.sql
+++
b/migrator-integration-test/src/test/resources/schema/oracle/oracle-create-schema-legacy.sql
@@ -12,6 +12,17 @@ CREATE TABLE JOURNAL (
)
/
+CREATE TABLE EVENT_TAG (
+ EVENT_ID NUMERIC,
+ PERSISTENCE_ID VARCHAR(255),
+ SEQUENCE_NUMBER NUMERIC,
+ TAG VARCHAR(255) NOT NULL,
+ PRIMARY KEY(PERSISTENCE_ID, SEQUENCE_NUMBER, TAG),
+ FOREIGN KEY(PERSISTENCE_ID, SEQUENCE_NUMBER)
REFERENCES EVENT_JOURNAL(PERSISTENCE_ID, SEQUENCE_NUMBER)
+ ON DELETE CASCADE
+)
+/
+
CREATE TABLE LEGACY_SNAPSHOT (
PERSISTENCE_ID VARCHAR(255) NOT NULL,
SEQUENCE_NUMBER NUMERIC NOT NULL,
diff --git
a/migrator-integration-test/src/test/resources/schema/postgres/postgres-create-schema-legacy.sql
b/migrator-integration-test/src/test/resources/schema/postgres/postgres-create-schema-legacy.sql
index 123d5dea..6baf317e 100644
---
a/migrator-integration-test/src/test/resources/schema/postgres/postgres-create-schema-legacy.sql
+++
b/migrator-integration-test/src/test/resources/schema/postgres/postgres-create-schema-legacy.sql
@@ -9,6 +9,18 @@ CREATE TABLE IF NOT EXISTS public.journal (
);
CREATE UNIQUE INDEX IF NOT EXISTS journal_ordering_idx ON
public.journal(ordering);
+CREATE TABLE IF NOT EXISTS public.event_tag (
+ event_id BIGINT,
+ persistence_id VARCHAR(255),
+ sequence_number BIGINT,
+ tag VARCHAR(256),
+ PRIMARY KEY(persistence_id, sequence_number, tag),
+ CONSTRAINT fk_event_journal
+ FOREIGN KEY(persistence_id, sequence_number)
+ REFERENCES event_journal(persistence_id, sequence_number)
+ ON DELETE CASCADE
+);
+
CREATE TABLE IF NOT EXISTS public.legacy_snapshot (
persistence_id VARCHAR(255) NOT NULL,
sequence_number BIGINT NOT NULL,
diff --git
a/migrator-integration-test/src/test/resources/schema/sqlserver/sqlserver-create-schema-legacy.sql
b/migrator-integration-test/src/test/resources/schema/sqlserver/sqlserver-create-schema-legacy.sql
index f89beabc..194773de 100644
---
a/migrator-integration-test/src/test/resources/schema/sqlserver/sqlserver-create-schema-legacy.sql
+++
b/migrator-integration-test/src/test/resources/schema/sqlserver/sqlserver-create-schema-legacy.sql
@@ -9,6 +9,19 @@ CREATE TABLE journal (
)
CREATE UNIQUE INDEX journal_ordering_idx ON journal (ordering);
+create table event_tag
+(
+ "event_id" BIGINT,
+ "persistence_id" NVARCHAR(255),
+ "sequence_number" NUMERIC(10, 0),
+ "tag" NVARCHAR(255) NOT NULL
+ primary key ("persistence_id", "sequence_number","tag"),
+ constraint "fk_event_journal"
+ foreign key ("persistence_id", "sequence_number")
+ references "dbo"."event_journal" ("persistence_id",
"sequence_number")
+ on delete CASCADE
+);
+
CREATE TABLE legacy_snapshot (
"persistence_id" VARCHAR(255) NOT NULL,
"sequence_number" NUMERIC(10,0) NOT NULL,
diff --git
a/migrator/src/main/scala/org/apache/pekko/persistence/jdbc/migrator/JournalMigrator.scala
b/migrator/src/main/scala/org/apache/pekko/persistence/jdbc/migrator/JournalMigrator.scala
index f67befbf..5471adcf 100644
---
a/migrator/src/main/scala/org/apache/pekko/persistence/jdbc/migrator/JournalMigrator.scala
+++
b/migrator/src/main/scala/org/apache/pekko/persistence/jdbc/migrator/JournalMigrator.scala
@@ -147,7 +147,12 @@ final case class JournalMigrator(profile:
JdbcProfile)(implicit system: ActorSys
for {
id <- newJournalQueries.JournalTable
.returning(newJournalQueries.JournalTable.map(_.ordering)) +=
journalSerializedRow
- tagInserts = tags.map(tag => TagRow(id, tag))
+ tagInserts = tags.map(tag =>
+ TagRow(
+ Some(id),
+ Some(journalSerializedRow.persistenceId),
+ Some(journalSerializedRow.sequenceNumber),
+ tag))
_ <- newJournalQueries.TagTable ++= tagInserts
} yield ()
}
diff --git a/scripts/docker-compose.yml b/scripts/docker-compose.yml
index 21843c65..9ea93bfa 100644
--- a/scripts/docker-compose.yml
+++ b/scripts/docker-compose.yml
@@ -19,6 +19,7 @@ services:
environment:
- "TZ=Europe/Amsterdam"
- "MYSQL_ROOT_PASSWORD=root"
+ - "MYSQL_DATABASE=pekko_persistence_jdbc"
ports:
- "3306:3306" # credentials (root:root)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]