Hi,

It seems there's a long-standing data loss issue related to the initial
sync of tables in the built-in logical replication (publications etc.).
I can reproduce it fairly reliably, but I haven't figured out all the
details yet and I'm a bit out of ideas, so I'm sharing what I know with
the hope someone takes a look and either spots the issue or has some
other insight ...

On the pgsql-bugs, Depesz reported reported [1] cases where tables are
added to a publication but end up missing rows on the subscriber. I
didn't know what might be the issue, but given his experience I decided
to take a do some blind attempts to reproduce the issue.

I'm not going to repeat all the details from the pgsql-bugs thread, but
I ended up writing a script that does randomized stress test tablesync
under concurrent load. Attached are two scripts, where crash-test.sh
does the main work, while run.sh drives the test - executes
crash-test.sh in a loop and generates random parameters for it.

The run.sh generates number of tables, refresh interval (after how many
tables we refresh subscription) and how long to sleep between steps (to
allow pgbench to do more work).

The crash-test.sh then does this:

  1) initializes two clusters (expects $PATH to have pg_ctl etc.)

  2) configures them for logical replication (wal_level, ...)

  3) creates publication and subscription on the nodes

  4) creates some a bunch of tables

  5) starts a pgbench that inserts data into the tables

  6) adds the tables to the publication one by one, occasionally
     refreshing the subscription

  7) waits for tablesync of all the tables to complete (so that the
     tables get into the 'r' state, thus replicating normally)

  8) stops the pgbench

  9) waits for the subscriber to fully catch up

  10) compares that the tables on publisher/subscriber nodes

To run this, just make sure PATH includes pg, and do e.g.

   ./run.sh 10

which does 10 runs of crash-test.sh with random parameters. Each run can
take a couple minutes, depending on the parameters, hardware etc.


Obviously, we expect the tables to match on the two nodes, but the
script regularly detects cases where the subscriber is missing some of
the rows. The script dumps those tables, and the rows contain timestamps
and LSNs to allow "rough correlation" (imperfect thanks to concurrency).

Depesz reported "gaps" in the data, i.e. missing a chunk of data, but
then following rows seemingly replicated. I did see such cases too, but
most of the time I see a missing chunk of rows at the end (but maybe if
the test continued a bit longer, it'd replicate some rows).

The report talks about replication between pg12->pg14, but I don't think
the cross-version part is necessary - I'm able to reproduce the issue on
individual versions (e.g. 12->12) since 12 (I haven't tried 11, but I'd
be surprised if it wasn't affected too).

The rows include `pg_current_wal_lsn()` to roughly track the LSN where
the row is inserted, and the "gap" of missing rows for each table seems
to match pg_subscription_rel.srsublsn, i.e. the LSN up to which
tablesync copied data, and the table should be replicated as usual.

Another interesting observation is that the issue only happens for "bulk
insert" transactions, i.e.

  BEGIN;
  ... INSERT into all tables ...
  COMMIT;

but not when each insert is a separate transaction. A bit strange.


After quite a bit of debugging, I came to the conclusion this happens
because we fail to invalidate caches on the publisher, so it does not
realize it should start sending rows for that table.

In particular, we initially build RelationSyncEntry when the table is
not yet included in the publication, so we end up with pubinsert=false,
thus not replicating the inserts. Which makes sense, but we then seems
to fail to invalidate the entry after it's added to the publication.

The other problem is that even if we happen to invalidate the entry, we
call GetRelationPublications(). But even if it happens long after the
table gets added to the publication (both in time and LSN terms), it
still returns NIL as if the table had no publications. And we end up
with pubinsert=false, skipping the inserts again.

Attached are three patches against master. 0001 adds some debug logging
that I found useful when investigating the issue. 0002 illustrates the
issue by forcefully invalidating the entry for each change, and
implementing a non-syscache variant of the GetRelationPublication().
This makes the code unbearably slow, but with both changes in place I
can no longer reproduce the issue. Undoing either of the two changes
makes it reproducible again. (I'll talk about 0003 later.)

I suppose timing matters, so it's possible it gets "fixed" simply
because of that, but I find that unlikely given the number of runs I did
without observing any failure.

Overall, this looks, walks and quacks like a cache invalidation issue,
likely a missing invalidation somewhere in the ALTER PUBLICATION code.
If we fail to invalidate the pg_publication_rel syscache somewhere, that
obviously explain why GetRelationPublications() returns stale data, but
it would also explain why the RelationSyncEntry is not invalidated, as
that happens in a syscache callback.

But I tried to do various crazy things in the ALTER PUBLICATION code,
and none of that worked, so I'm a bit confused/lost.


However, while randomly poking at different things, I realized that if I
change the lock obtained on the relation in OpenTableList() from
ShareUpdateExclusiveLock to ShareRowExclusiveLock, the issue goes away.
I don't know why it works, and I don't even recall what exactly led me
to the idea of changing it.

This is what 0003 does - it reverts 0002 and changes the lock level.

AFAIK the logical decoding code doesn't actually acquire locks on the
decoded tables, so why would this change matter? The only place that
does lock the relation is the tablesync, which gets RowExclusiveLock on
it. And it's interesting that RowExclusiveLock does not conflict with
ShareUpdateExclusiveLock, but does with ShareRowExclusiveLock. But why
would this even matter, when the tablesync can only touch the table
after it gets added to the publication?


regards

[1] https://www.postgresql.org/message-id/ztu8gtdajckzv...@depesz.com

-- 
Tomas Vondra
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
From 4253c364838daf26c056c56d693bc00b1e3e8f73 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.von...@postgresql.org>
Date: Thu, 16 Nov 2023 17:46:14 +0100
Subject: [PATCH 1/3] debug logging

---
 src/backend/catalog/pg_subscription.c       |  8 ++++++
 src/backend/replication/logical/worker.c    | 11 ++++++++
 src/backend/replication/pgoutput/pgoutput.c | 29 ++++++++++++++++++++-
 3 files changed, 47 insertions(+), 1 deletion(-)

diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index d6a978f1362..0a2a644c293 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -238,6 +238,8 @@ AddSubscriptionRelState(Oid subid, Oid relid, char state,
 	bool		nulls[Natts_pg_subscription_rel];
 	Datum		values[Natts_pg_subscription_rel];
 
+	elog(LOG, "AddSubscriptionRelState relid %d state %c LSN %X/%X", relid, state, LSN_FORMAT_ARGS(sublsn));
+
 	LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
 
 	rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
@@ -285,6 +287,8 @@ UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
 	Datum		values[Natts_pg_subscription_rel];
 	bool		replaces[Natts_pg_subscription_rel];
 
+	elog(LOG, "UpdateSubscriptionRelState relid %d state %c LSN %X/%X", relid, state, LSN_FORMAT_ARGS(sublsn));
+
 	LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
 
 	rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
@@ -369,6 +373,8 @@ GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
 
 	table_close(rel, AccessShareLock);
 
+	elog(LOG, "GetSubscriptionRelState relid %d state %c LSN %X/%X", relid, substate, LSN_FORMAT_ARGS(*sublsn));
+
 	return substate;
 }
 
@@ -531,6 +537,8 @@ GetSubscriptionRelations(Oid subid, bool not_ready)
 		else
 			relstate->lsn = DatumGetLSN(d);
 
+		elog(LOG, "GetSubscriptionRelations relid %d state %c LSN %X/%X", relstate->relid, relstate->state, LSN_FORMAT_ARGS(relstate->lsn));
+
 		res = lappend(res, relstate);
 	}
 
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 52a9f136ab9..42e5423172b 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -503,6 +503,10 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
 			return rel->state == SUBREL_STATE_READY;
 
 		case WORKERTYPE_APPLY:
+			elog(LOG, "should_apply_changes_for_rel relid %d return %d", RelationGetRelid(rel->localrel), (rel->state == SUBREL_STATE_READY ||
+					(rel->state == SUBREL_STATE_SYNCDONE &&
+					 rel->statelsn <= remote_final_lsn)));
+
 			return (rel->state == SUBREL_STATE_READY ||
 					(rel->state == SUBREL_STATE_SYNCDONE &&
 					 rel->statelsn <= remote_final_lsn));
@@ -2398,20 +2402,27 @@ apply_handle_insert(StringInfo s)
 	MemoryContext oldctx;
 	bool		run_as_owner;
 
+	elog(LOG, "apply_handle_insert");
+
 	/*
 	 * Quick return if we are skipping data modification changes or handling
 	 * streamed transactions.
 	 */
 	if (is_skipping_changes() ||
 		handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s))
+	{
+		elog(LOG, "apply_handle_insert / skipping changes or streaming");
 		return;
+	}
 
 	begin_replication_step();
 
 	relid = logicalrep_read_insert(s, &newtup);
 	rel = logicalrep_rel_open(relid, RowExclusiveLock);
+	elog(LOG, "apply_handle_insert relid %d state %c lsn %X/%X final %X/%X", RelationGetRelid(rel->localrel), rel->state, LSN_FORMAT_ARGS(rel->statelsn), LSN_FORMAT_ARGS(remote_final_lsn));
 	if (!should_apply_changes_for_rel(rel))
 	{
+		elog(LOG, "apply_handle_insert relid %d skipping", RelationGetRelid(rel->localrel));
 		/*
 		 * The relation can't become interesting in the middle of the
 		 * transaction so it's safe to unlock it.
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index e8add5ee5d9..09f53bea0a0 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -1409,9 +1409,12 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	ReorderBufferChangeType action = change->action;
 	TupleTableSlot *old_slot = NULL;
 	TupleTableSlot *new_slot = NULL;
-
+elog(LOG, "pgoutput_change relid %d LSN %X/%X", RelationGetRelid(relation), LSN_FORMAT_ARGS(change->lsn));
 	if (!is_publishable_relation(relation))
+	{
+		elog(LOG, "pgoutput_change relid %d LSN %X/%X not publishable", RelationGetRelid(relation), LSN_FORMAT_ARGS(change->lsn));
 		return;
+	}
 
 	/*
 	 * Remember the xid for the change in streaming mode. We need to send xid
@@ -1429,7 +1432,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	{
 		case REORDER_BUFFER_CHANGE_INSERT:
 			if (!relentry->pubactions.pubinsert)
+			{
+				elog(LOG, "pgoutput_change relid %d LSN %X/%X pubinsert=false", RelationGetRelid(relation), LSN_FORMAT_ARGS(change->lsn));
 				return;
+			}
 			break;
 		case REORDER_BUFFER_CHANGE_UPDATE:
 			if (!relentry->pubactions.pubupdate)
@@ -1502,7 +1508,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	 * of the row filter for old and new tuple.
 	 */
 	if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action))
+	{
+		elog(LOG, "pgoutput_change relid %d LSN %X/%X pgoutput_row_filter", RelationGetRelid(relation), LSN_FORMAT_ARGS(change->lsn));
 		goto cleanup;
+	}
 
 	/*
 	 * Send BEGIN if we haven't yet.
@@ -1522,10 +1531,13 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 
 	OutputPluginPrepareWrite(ctx, true);
 
+	elog(LOG, "pgoutput_change relid %d LSN %X/%X sending data", RelationGetRelid(relation), LSN_FORMAT_ARGS(change->lsn));
+
 	/* Send the data */
 	switch (action)
 	{
 		case REORDER_BUFFER_CHANGE_INSERT:
+			elog(LOG, "pgoutput_change relid %d LSN %X/%X write insert message", RelationGetRelid(relation), LSN_FORMAT_ARGS(change->lsn));
 			logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
 									data->binary, relentry->columns);
 			break;
@@ -1974,6 +1986,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 	/* initialize entry, if it's new */
 	if (!found)
 	{
+		elog(LOG, "get_rel_sync_entry relation %d not found", RelationGetRelid(relation));
 		entry->replicate_valid = false;
 		entry->schema_sent = false;
 		entry->streamed_txns = NIL;
@@ -1988,6 +2001,8 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 		entry->attrmap = NULL;
 	}
 
+	elog(LOG, "get_rel_sync_entry relation %d replicate_valid %d", RelationGetRelid(relation), entry->replicate_valid);
+
 	/* Validate the entry */
 	if (!entry->replicate_valid)
 	{
@@ -2007,6 +2022,8 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 		char		relkind = get_rel_relkind(relid);
 		List	   *rel_publications = NIL;
 
+		elog(LOG, "GetRelationPublications relation %d publications %p %d", relid, pubids, list_length(pubids));
+
 		/* Reload publications if needed before use. */
 		if (!publications_valid)
 		{
@@ -2021,6 +2038,8 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 			publications_valid = true;
 		}
 
+		elog(LOG, "get_rel_sync_entry relation %d publications %d", RelationGetRelid(relation), list_length(data->publications));
+
 		/*
 		 * Reset schema_sent status as the relation definition may have
 		 * changed.  Also reset pubactions to empty in case rel was dropped
@@ -2097,6 +2116,8 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 				}
 			}
 
+			elog(LOG, "get_rel_sync_entry relation %d publish %d (A)", RelationGetRelid(relation), publish);
+
 			if (!publish)
 			{
 				bool		ancestor_published = false;
@@ -2128,12 +2149,16 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 					}
 				}
 
+				elog(LOG, "get_rel_sync_entry relation %d relation pubs %d subscription pub %d", RelationGetRelid(relation), list_length(pubids), pub->oid);
+
 				if (list_member_oid(pubids, pub->oid) ||
 					list_member_oid(schemaPubids, pub->oid) ||
 					ancestor_published)
 					publish = true;
 			}
 
+			elog(LOG, "get_rel_sync_entry relation %d publish %d (B)", RelationGetRelid(relation), publish);
+
 			/*
 			 * If the relation is to be published, determine actions to
 			 * publish, and list of columns, if appropriate.
@@ -2210,6 +2235,8 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 		entry->replicate_valid = true;
 	}
 
+	elog(LOG, "get_rel_sync_entry relation %d pubinsert %d", RelationGetRelid(relation), entry->pubactions.pubinsert);
+
 	return entry;
 }
 
-- 
2.41.0

From bc400e7c9c8ad2ac4a0c431dfab9ec2f78bd5870 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.von...@postgresql.org>
Date: Thu, 16 Nov 2023 19:57:33 +0100
Subject: [PATCH 2/3] experimental fix

---
 src/backend/replication/pgoutput/pgoutput.c | 54 ++++++++++++++++++++-
 1 file changed, 53 insertions(+), 1 deletion(-)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 09f53bea0a0..adcd5688045 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -12,6 +12,8 @@
  */
 #include "postgres.h"
 
+#include "access/genam.h"
+#include "access/table.h"
 #include "access/tupconvert.h"
 #include "catalog/partition.h"
 #include "catalog/pg_publication.h"
@@ -29,6 +31,7 @@
 #include "replication/origin.h"
 #include "replication/pgoutput.h"
 #include "utils/builtins.h"
+#include "utils/fmgroids.h"
 #include "utils/inval.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
@@ -1958,6 +1961,51 @@ set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
 	MemoryContextSwitchTo(oldctx);
 }
 
+static List*
+GetRelationPublicationsRaw(Oid relid)
+{
+	List *result = NIL;
+	SysScanDesc scandesc;
+	Relation relation;
+	ScanKeyData key[1];
+	HeapTuple	tup;
+
+	elog(LOG, "GetRelationPublicationsRaw relid %d / start", relid);
+
+	relation = table_open(PublicationRelRelationId, AccessShareLock);
+
+	ScanKeyInit(&key[0],
+				Anum_pg_publication_rel_prrelid,
+				BTEqualStrategyNumber, F_OIDEQ,
+				ObjectIdGetDatum(relid));
+
+	scandesc = systable_beginscan(relation,
+								  PublicationRelPrrelidPrpubidIndexId,
+								  true,
+								  NULL,
+								  1,
+								  key);
+
+	while (HeapTupleIsValid(tup = systable_getnext(scandesc)))
+	{
+		Form_pg_publication_rel form;
+
+		form = (Form_pg_publication_rel) GETSTRUCT(tup);
+
+		elog(LOG, "GetRelationPublicationsRaw relid %d / tuple pub %d", relid, form->prpubid);
+
+		result = lappend_oid(result, form->prpubid);
+	}
+
+	systable_endscan(scandesc);
+
+	table_close(relation, AccessShareLock);
+
+	elog(LOG, "GetRelationPublicationsRaw relid %d / end", relid);
+
+	return result;
+}
+
 /*
  * Find or create entry in the relation schema cache.
  *
@@ -2003,11 +2051,15 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 
 	elog(LOG, "get_rel_sync_entry relation %d replicate_valid %d", RelationGetRelid(relation), entry->replicate_valid);
 
+	/* force refresh of the entry for each change */
+	entry->replicate_valid = false;
+
 	/* Validate the entry */
 	if (!entry->replicate_valid)
 	{
 		Oid			schemaId = get_rel_namespace(relid);
-		List	   *pubids = GetRelationPublications(relid);
+		List	   *pubids = GetRelationPublicationsRaw(relid);
+		// List	   *pubids = GetRelationPublications(relid);
 
 		/*
 		 * We don't acquire a lock on the namespace system table as we build
-- 
2.41.0

From a95b5fe1d8243e16fdf5e2db5a773495f4641829 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.von...@postgresql.org>
Date: Fri, 17 Nov 2023 11:20:50 +0100
Subject: [PATCH 3/3] alternative experimental fix - lock

---
 src/backend/commands/publicationcmds.c      |  2 +-
 src/backend/replication/pgoutput/pgoutput.c | 54 +--------------------
 2 files changed, 2 insertions(+), 54 deletions(-)

diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index f4ba572697a..7a4c315183a 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -1575,7 +1575,7 @@ OpenTableList(List *tables)
 		/* Allow query cancel in case this takes a long time */
 		CHECK_FOR_INTERRUPTS();
 
-		rel = table_openrv(t->relation, ShareUpdateExclusiveLock);
+		rel = table_openrv(t->relation, ShareRowExclusiveLock);
 		myrelid = RelationGetRelid(rel);
 
 		/*
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index adcd5688045..09f53bea0a0 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -12,8 +12,6 @@
  */
 #include "postgres.h"
 
-#include "access/genam.h"
-#include "access/table.h"
 #include "access/tupconvert.h"
 #include "catalog/partition.h"
 #include "catalog/pg_publication.h"
@@ -31,7 +29,6 @@
 #include "replication/origin.h"
 #include "replication/pgoutput.h"
 #include "utils/builtins.h"
-#include "utils/fmgroids.h"
 #include "utils/inval.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
@@ -1961,51 +1958,6 @@ set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
 	MemoryContextSwitchTo(oldctx);
 }
 
-static List*
-GetRelationPublicationsRaw(Oid relid)
-{
-	List *result = NIL;
-	SysScanDesc scandesc;
-	Relation relation;
-	ScanKeyData key[1];
-	HeapTuple	tup;
-
-	elog(LOG, "GetRelationPublicationsRaw relid %d / start", relid);
-
-	relation = table_open(PublicationRelRelationId, AccessShareLock);
-
-	ScanKeyInit(&key[0],
-				Anum_pg_publication_rel_prrelid,
-				BTEqualStrategyNumber, F_OIDEQ,
-				ObjectIdGetDatum(relid));
-
-	scandesc = systable_beginscan(relation,
-								  PublicationRelPrrelidPrpubidIndexId,
-								  true,
-								  NULL,
-								  1,
-								  key);
-
-	while (HeapTupleIsValid(tup = systable_getnext(scandesc)))
-	{
-		Form_pg_publication_rel form;
-
-		form = (Form_pg_publication_rel) GETSTRUCT(tup);
-
-		elog(LOG, "GetRelationPublicationsRaw relid %d / tuple pub %d", relid, form->prpubid);
-
-		result = lappend_oid(result, form->prpubid);
-	}
-
-	systable_endscan(scandesc);
-
-	table_close(relation, AccessShareLock);
-
-	elog(LOG, "GetRelationPublicationsRaw relid %d / end", relid);
-
-	return result;
-}
-
 /*
  * Find or create entry in the relation schema cache.
  *
@@ -2051,15 +2003,11 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 
 	elog(LOG, "get_rel_sync_entry relation %d replicate_valid %d", RelationGetRelid(relation), entry->replicate_valid);
 
-	/* force refresh of the entry for each change */
-	entry->replicate_valid = false;
-
 	/* Validate the entry */
 	if (!entry->replicate_valid)
 	{
 		Oid			schemaId = get_rel_namespace(relid);
-		List	   *pubids = GetRelationPublicationsRaw(relid);
-		// List	   *pubids = GetRelationPublications(relid);
+		List	   *pubids = GetRelationPublications(relid);
 
 		/*
 		 * We don't acquire a lock on the namespace system table as we build
-- 
2.41.0

Attachment: crash-test.sh
Description: application/shellscript

Attachment: run.sh
Description: application/shellscript

Reply via email to