Hi, It seems there's a long-standing data loss issue related to the initial sync of tables in the built-in logical replication (publications etc.). I can reproduce it fairly reliably, but I haven't figured out all the details yet and I'm a bit out of ideas, so I'm sharing what I know with the hope someone takes a look and either spots the issue or has some other insight ...
On the pgsql-bugs, Depesz reported reported [1] cases where tables are added to a publication but end up missing rows on the subscriber. I didn't know what might be the issue, but given his experience I decided to take a do some blind attempts to reproduce the issue. I'm not going to repeat all the details from the pgsql-bugs thread, but I ended up writing a script that does randomized stress test tablesync under concurrent load. Attached are two scripts, where crash-test.sh does the main work, while run.sh drives the test - executes crash-test.sh in a loop and generates random parameters for it. The run.sh generates number of tables, refresh interval (after how many tables we refresh subscription) and how long to sleep between steps (to allow pgbench to do more work). The crash-test.sh then does this: 1) initializes two clusters (expects $PATH to have pg_ctl etc.) 2) configures them for logical replication (wal_level, ...) 3) creates publication and subscription on the nodes 4) creates some a bunch of tables 5) starts a pgbench that inserts data into the tables 6) adds the tables to the publication one by one, occasionally refreshing the subscription 7) waits for tablesync of all the tables to complete (so that the tables get into the 'r' state, thus replicating normally) 8) stops the pgbench 9) waits for the subscriber to fully catch up 10) compares that the tables on publisher/subscriber nodes To run this, just make sure PATH includes pg, and do e.g. ./run.sh 10 which does 10 runs of crash-test.sh with random parameters. Each run can take a couple minutes, depending on the parameters, hardware etc. Obviously, we expect the tables to match on the two nodes, but the script regularly detects cases where the subscriber is missing some of the rows. The script dumps those tables, and the rows contain timestamps and LSNs to allow "rough correlation" (imperfect thanks to concurrency). Depesz reported "gaps" in the data, i.e. missing a chunk of data, but then following rows seemingly replicated. I did see such cases too, but most of the time I see a missing chunk of rows at the end (but maybe if the test continued a bit longer, it'd replicate some rows). The report talks about replication between pg12->pg14, but I don't think the cross-version part is necessary - I'm able to reproduce the issue on individual versions (e.g. 12->12) since 12 (I haven't tried 11, but I'd be surprised if it wasn't affected too). The rows include `pg_current_wal_lsn()` to roughly track the LSN where the row is inserted, and the "gap" of missing rows for each table seems to match pg_subscription_rel.srsublsn, i.e. the LSN up to which tablesync copied data, and the table should be replicated as usual. Another interesting observation is that the issue only happens for "bulk insert" transactions, i.e. BEGIN; ... INSERT into all tables ... COMMIT; but not when each insert is a separate transaction. A bit strange. After quite a bit of debugging, I came to the conclusion this happens because we fail to invalidate caches on the publisher, so it does not realize it should start sending rows for that table. In particular, we initially build RelationSyncEntry when the table is not yet included in the publication, so we end up with pubinsert=false, thus not replicating the inserts. Which makes sense, but we then seems to fail to invalidate the entry after it's added to the publication. The other problem is that even if we happen to invalidate the entry, we call GetRelationPublications(). But even if it happens long after the table gets added to the publication (both in time and LSN terms), it still returns NIL as if the table had no publications. And we end up with pubinsert=false, skipping the inserts again. Attached are three patches against master. 0001 adds some debug logging that I found useful when investigating the issue. 0002 illustrates the issue by forcefully invalidating the entry for each change, and implementing a non-syscache variant of the GetRelationPublication(). This makes the code unbearably slow, but with both changes in place I can no longer reproduce the issue. Undoing either of the two changes makes it reproducible again. (I'll talk about 0003 later.) I suppose timing matters, so it's possible it gets "fixed" simply because of that, but I find that unlikely given the number of runs I did without observing any failure. Overall, this looks, walks and quacks like a cache invalidation issue, likely a missing invalidation somewhere in the ALTER PUBLICATION code. If we fail to invalidate the pg_publication_rel syscache somewhere, that obviously explain why GetRelationPublications() returns stale data, but it would also explain why the RelationSyncEntry is not invalidated, as that happens in a syscache callback. But I tried to do various crazy things in the ALTER PUBLICATION code, and none of that worked, so I'm a bit confused/lost. However, while randomly poking at different things, I realized that if I change the lock obtained on the relation in OpenTableList() from ShareUpdateExclusiveLock to ShareRowExclusiveLock, the issue goes away. I don't know why it works, and I don't even recall what exactly led me to the idea of changing it. This is what 0003 does - it reverts 0002 and changes the lock level. AFAIK the logical decoding code doesn't actually acquire locks on the decoded tables, so why would this change matter? The only place that does lock the relation is the tablesync, which gets RowExclusiveLock on it. And it's interesting that RowExclusiveLock does not conflict with ShareUpdateExclusiveLock, but does with ShareRowExclusiveLock. But why would this even matter, when the tablesync can only touch the table after it gets added to the publication? regards [1] https://www.postgresql.org/message-id/ztu8gtdajckzv...@depesz.com -- Tomas Vondra EnterpriseDB: http://www.enterprisedb.com The Enterprise PostgreSQL Company
From 4253c364838daf26c056c56d693bc00b1e3e8f73 Mon Sep 17 00:00:00 2001 From: Tomas Vondra <tomas.von...@postgresql.org> Date: Thu, 16 Nov 2023 17:46:14 +0100 Subject: [PATCH 1/3] debug logging --- src/backend/catalog/pg_subscription.c | 8 ++++++ src/backend/replication/logical/worker.c | 11 ++++++++ src/backend/replication/pgoutput/pgoutput.c | 29 ++++++++++++++++++++- 3 files changed, 47 insertions(+), 1 deletion(-) diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index d6a978f1362..0a2a644c293 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -238,6 +238,8 @@ AddSubscriptionRelState(Oid subid, Oid relid, char state, bool nulls[Natts_pg_subscription_rel]; Datum values[Natts_pg_subscription_rel]; + elog(LOG, "AddSubscriptionRelState relid %d state %c LSN %X/%X", relid, state, LSN_FORMAT_ARGS(sublsn)); + LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock); rel = table_open(SubscriptionRelRelationId, RowExclusiveLock); @@ -285,6 +287,8 @@ UpdateSubscriptionRelState(Oid subid, Oid relid, char state, Datum values[Natts_pg_subscription_rel]; bool replaces[Natts_pg_subscription_rel]; + elog(LOG, "UpdateSubscriptionRelState relid %d state %c LSN %X/%X", relid, state, LSN_FORMAT_ARGS(sublsn)); + LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock); rel = table_open(SubscriptionRelRelationId, RowExclusiveLock); @@ -369,6 +373,8 @@ GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn) table_close(rel, AccessShareLock); + elog(LOG, "GetSubscriptionRelState relid %d state %c LSN %X/%X", relid, substate, LSN_FORMAT_ARGS(*sublsn)); + return substate; } @@ -531,6 +537,8 @@ GetSubscriptionRelations(Oid subid, bool not_ready) else relstate->lsn = DatumGetLSN(d); + elog(LOG, "GetSubscriptionRelations relid %d state %c LSN %X/%X", relstate->relid, relstate->state, LSN_FORMAT_ARGS(relstate->lsn)); + res = lappend(res, relstate); } diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 52a9f136ab9..42e5423172b 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -503,6 +503,10 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel) return rel->state == SUBREL_STATE_READY; case WORKERTYPE_APPLY: + elog(LOG, "should_apply_changes_for_rel relid %d return %d", RelationGetRelid(rel->localrel), (rel->state == SUBREL_STATE_READY || + (rel->state == SUBREL_STATE_SYNCDONE && + rel->statelsn <= remote_final_lsn))); + return (rel->state == SUBREL_STATE_READY || (rel->state == SUBREL_STATE_SYNCDONE && rel->statelsn <= remote_final_lsn)); @@ -2398,20 +2402,27 @@ apply_handle_insert(StringInfo s) MemoryContext oldctx; bool run_as_owner; + elog(LOG, "apply_handle_insert"); + /* * Quick return if we are skipping data modification changes or handling * streamed transactions. */ if (is_skipping_changes() || handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s)) + { + elog(LOG, "apply_handle_insert / skipping changes or streaming"); return; + } begin_replication_step(); relid = logicalrep_read_insert(s, &newtup); rel = logicalrep_rel_open(relid, RowExclusiveLock); + elog(LOG, "apply_handle_insert relid %d state %c lsn %X/%X final %X/%X", RelationGetRelid(rel->localrel), rel->state, LSN_FORMAT_ARGS(rel->statelsn), LSN_FORMAT_ARGS(remote_final_lsn)); if (!should_apply_changes_for_rel(rel)) { + elog(LOG, "apply_handle_insert relid %d skipping", RelationGetRelid(rel->localrel)); /* * The relation can't become interesting in the middle of the * transaction so it's safe to unlock it. diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index e8add5ee5d9..09f53bea0a0 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -1409,9 +1409,12 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, ReorderBufferChangeType action = change->action; TupleTableSlot *old_slot = NULL; TupleTableSlot *new_slot = NULL; - +elog(LOG, "pgoutput_change relid %d LSN %X/%X", RelationGetRelid(relation), LSN_FORMAT_ARGS(change->lsn)); if (!is_publishable_relation(relation)) + { + elog(LOG, "pgoutput_change relid %d LSN %X/%X not publishable", RelationGetRelid(relation), LSN_FORMAT_ARGS(change->lsn)); return; + } /* * Remember the xid for the change in streaming mode. We need to send xid @@ -1429,7 +1432,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, { case REORDER_BUFFER_CHANGE_INSERT: if (!relentry->pubactions.pubinsert) + { + elog(LOG, "pgoutput_change relid %d LSN %X/%X pubinsert=false", RelationGetRelid(relation), LSN_FORMAT_ARGS(change->lsn)); return; + } break; case REORDER_BUFFER_CHANGE_UPDATE: if (!relentry->pubactions.pubupdate) @@ -1502,7 +1508,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, * of the row filter for old and new tuple. */ if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action)) + { + elog(LOG, "pgoutput_change relid %d LSN %X/%X pgoutput_row_filter", RelationGetRelid(relation), LSN_FORMAT_ARGS(change->lsn)); goto cleanup; + } /* * Send BEGIN if we haven't yet. @@ -1522,10 +1531,13 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, OutputPluginPrepareWrite(ctx, true); + elog(LOG, "pgoutput_change relid %d LSN %X/%X sending data", RelationGetRelid(relation), LSN_FORMAT_ARGS(change->lsn)); + /* Send the data */ switch (action) { case REORDER_BUFFER_CHANGE_INSERT: + elog(LOG, "pgoutput_change relid %d LSN %X/%X write insert message", RelationGetRelid(relation), LSN_FORMAT_ARGS(change->lsn)); logicalrep_write_insert(ctx->out, xid, targetrel, new_slot, data->binary, relentry->columns); break; @@ -1974,6 +1986,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) /* initialize entry, if it's new */ if (!found) { + elog(LOG, "get_rel_sync_entry relation %d not found", RelationGetRelid(relation)); entry->replicate_valid = false; entry->schema_sent = false; entry->streamed_txns = NIL; @@ -1988,6 +2001,8 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) entry->attrmap = NULL; } + elog(LOG, "get_rel_sync_entry relation %d replicate_valid %d", RelationGetRelid(relation), entry->replicate_valid); + /* Validate the entry */ if (!entry->replicate_valid) { @@ -2007,6 +2022,8 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) char relkind = get_rel_relkind(relid); List *rel_publications = NIL; + elog(LOG, "GetRelationPublications relation %d publications %p %d", relid, pubids, list_length(pubids)); + /* Reload publications if needed before use. */ if (!publications_valid) { @@ -2021,6 +2038,8 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) publications_valid = true; } + elog(LOG, "get_rel_sync_entry relation %d publications %d", RelationGetRelid(relation), list_length(data->publications)); + /* * Reset schema_sent status as the relation definition may have * changed. Also reset pubactions to empty in case rel was dropped @@ -2097,6 +2116,8 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) } } + elog(LOG, "get_rel_sync_entry relation %d publish %d (A)", RelationGetRelid(relation), publish); + if (!publish) { bool ancestor_published = false; @@ -2128,12 +2149,16 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) } } + elog(LOG, "get_rel_sync_entry relation %d relation pubs %d subscription pub %d", RelationGetRelid(relation), list_length(pubids), pub->oid); + if (list_member_oid(pubids, pub->oid) || list_member_oid(schemaPubids, pub->oid) || ancestor_published) publish = true; } + elog(LOG, "get_rel_sync_entry relation %d publish %d (B)", RelationGetRelid(relation), publish); + /* * If the relation is to be published, determine actions to * publish, and list of columns, if appropriate. @@ -2210,6 +2235,8 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) entry->replicate_valid = true; } + elog(LOG, "get_rel_sync_entry relation %d pubinsert %d", RelationGetRelid(relation), entry->pubactions.pubinsert); + return entry; } -- 2.41.0
From bc400e7c9c8ad2ac4a0c431dfab9ec2f78bd5870 Mon Sep 17 00:00:00 2001 From: Tomas Vondra <tomas.von...@postgresql.org> Date: Thu, 16 Nov 2023 19:57:33 +0100 Subject: [PATCH 2/3] experimental fix --- src/backend/replication/pgoutput/pgoutput.c | 54 ++++++++++++++++++++- 1 file changed, 53 insertions(+), 1 deletion(-) diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 09f53bea0a0..adcd5688045 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -12,6 +12,8 @@ */ #include "postgres.h" +#include "access/genam.h" +#include "access/table.h" #include "access/tupconvert.h" #include "catalog/partition.h" #include "catalog/pg_publication.h" @@ -29,6 +31,7 @@ #include "replication/origin.h" #include "replication/pgoutput.h" #include "utils/builtins.h" +#include "utils/fmgroids.h" #include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/memutils.h" @@ -1958,6 +1961,51 @@ set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid) MemoryContextSwitchTo(oldctx); } +static List* +GetRelationPublicationsRaw(Oid relid) +{ + List *result = NIL; + SysScanDesc scandesc; + Relation relation; + ScanKeyData key[1]; + HeapTuple tup; + + elog(LOG, "GetRelationPublicationsRaw relid %d / start", relid); + + relation = table_open(PublicationRelRelationId, AccessShareLock); + + ScanKeyInit(&key[0], + Anum_pg_publication_rel_prrelid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(relid)); + + scandesc = systable_beginscan(relation, + PublicationRelPrrelidPrpubidIndexId, + true, + NULL, + 1, + key); + + while (HeapTupleIsValid(tup = systable_getnext(scandesc))) + { + Form_pg_publication_rel form; + + form = (Form_pg_publication_rel) GETSTRUCT(tup); + + elog(LOG, "GetRelationPublicationsRaw relid %d / tuple pub %d", relid, form->prpubid); + + result = lappend_oid(result, form->prpubid); + } + + systable_endscan(scandesc); + + table_close(relation, AccessShareLock); + + elog(LOG, "GetRelationPublicationsRaw relid %d / end", relid); + + return result; +} + /* * Find or create entry in the relation schema cache. * @@ -2003,11 +2051,15 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) elog(LOG, "get_rel_sync_entry relation %d replicate_valid %d", RelationGetRelid(relation), entry->replicate_valid); + /* force refresh of the entry for each change */ + entry->replicate_valid = false; + /* Validate the entry */ if (!entry->replicate_valid) { Oid schemaId = get_rel_namespace(relid); - List *pubids = GetRelationPublications(relid); + List *pubids = GetRelationPublicationsRaw(relid); + // List *pubids = GetRelationPublications(relid); /* * We don't acquire a lock on the namespace system table as we build -- 2.41.0
From a95b5fe1d8243e16fdf5e2db5a773495f4641829 Mon Sep 17 00:00:00 2001 From: Tomas Vondra <tomas.von...@postgresql.org> Date: Fri, 17 Nov 2023 11:20:50 +0100 Subject: [PATCH 3/3] alternative experimental fix - lock --- src/backend/commands/publicationcmds.c | 2 +- src/backend/replication/pgoutput/pgoutput.c | 54 +-------------------- 2 files changed, 2 insertions(+), 54 deletions(-) diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index f4ba572697a..7a4c315183a 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -1575,7 +1575,7 @@ OpenTableList(List *tables) /* Allow query cancel in case this takes a long time */ CHECK_FOR_INTERRUPTS(); - rel = table_openrv(t->relation, ShareUpdateExclusiveLock); + rel = table_openrv(t->relation, ShareRowExclusiveLock); myrelid = RelationGetRelid(rel); /* diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index adcd5688045..09f53bea0a0 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -12,8 +12,6 @@ */ #include "postgres.h" -#include "access/genam.h" -#include "access/table.h" #include "access/tupconvert.h" #include "catalog/partition.h" #include "catalog/pg_publication.h" @@ -31,7 +29,6 @@ #include "replication/origin.h" #include "replication/pgoutput.h" #include "utils/builtins.h" -#include "utils/fmgroids.h" #include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/memutils.h" @@ -1961,51 +1958,6 @@ set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid) MemoryContextSwitchTo(oldctx); } -static List* -GetRelationPublicationsRaw(Oid relid) -{ - List *result = NIL; - SysScanDesc scandesc; - Relation relation; - ScanKeyData key[1]; - HeapTuple tup; - - elog(LOG, "GetRelationPublicationsRaw relid %d / start", relid); - - relation = table_open(PublicationRelRelationId, AccessShareLock); - - ScanKeyInit(&key[0], - Anum_pg_publication_rel_prrelid, - BTEqualStrategyNumber, F_OIDEQ, - ObjectIdGetDatum(relid)); - - scandesc = systable_beginscan(relation, - PublicationRelPrrelidPrpubidIndexId, - true, - NULL, - 1, - key); - - while (HeapTupleIsValid(tup = systable_getnext(scandesc))) - { - Form_pg_publication_rel form; - - form = (Form_pg_publication_rel) GETSTRUCT(tup); - - elog(LOG, "GetRelationPublicationsRaw relid %d / tuple pub %d", relid, form->prpubid); - - result = lappend_oid(result, form->prpubid); - } - - systable_endscan(scandesc); - - table_close(relation, AccessShareLock); - - elog(LOG, "GetRelationPublicationsRaw relid %d / end", relid); - - return result; -} - /* * Find or create entry in the relation schema cache. * @@ -2051,15 +2003,11 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) elog(LOG, "get_rel_sync_entry relation %d replicate_valid %d", RelationGetRelid(relation), entry->replicate_valid); - /* force refresh of the entry for each change */ - entry->replicate_valid = false; - /* Validate the entry */ if (!entry->replicate_valid) { Oid schemaId = get_rel_namespace(relid); - List *pubids = GetRelationPublicationsRaw(relid); - // List *pubids = GetRelationPublications(relid); + List *pubids = GetRelationPublications(relid); /* * We don't acquire a lock on the namespace system table as we build -- 2.41.0
crash-test.sh
Description: application/shellscript
run.sh
Description: application/shellscript