On Tue, Jan 18, 2022 at 12:04 PM tanghy.f...@fujitsu.com <tanghy.f...@fujitsu.com> wrote: > > On Mon, Jan 17, 2022 2:18 PM Masahiko Sawada <sawada.m...@gmail.com> wrote: > > > > I've attached an updated patch. Please review it. > > > > Thanks for updating the patch. Few comments: > > 1) > /* Two_phase is only supported in v15 and higher */ > if (pset.sversion >= 150000) > appendPQExpBuffer(&buf, > - ", subtwophasestate > AS \"%s\"\n", > - gettext_noop("Two > phase commit")); > + ", subtwophasestate > AS \"%s\"\n" > + ", subskipxid AS > \"%s\"\n", > + gettext_noop("Two > phase commit"), > + gettext_noop("Skip > XID")); > > appendPQExpBuffer(&buf, > ", subsynccommit AS > \"%s\"\n" > > I think "skip xid" should be mentioned in the comment. Maybe it could be > changed to: > "Two_phase and skip XID are only supported in v15 and higher"
Added. > > 2) The following two places are not consistent in whether "= value" is > surround > with square brackets. > > +ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SKIP ( > <replaceable class="parameter">skip_option</replaceable> [= <replaceable > class="parameter">value</replaceable>] [, ... ] ) > > + <term><literal>SKIP ( <replaceable > class="parameter">skip_option</replaceable> = <replaceable > class="parameter">value</replaceable> [, ... ] )</literal></term> > > Should we modify the first place to: > +ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SKIP ( > <replaceable class="parameter">skip_option</replaceable> = <replaceable > class="parameter">value</replaceable> [, ... ] ) > > Because currently there is only one skip_option - xid, and a parameter must be > specified when using it. Good catch. Fixed. > > 3) > + * Protect subskip_xid of pg_subscription from being concurrently > updated > + * while clearing it. > > "subskip_xid" should be "subskipxid" I think. Fixed. > > 4) > +/* > + * Start skipping changes of the transaction if the given XID matches the > + * transaction ID specified by skip_xid option. > + */ > > The option name was "skip_xid" in the previous version, and it is "xid" in > latest patch. So should we modify "skip_xid option" to "skip xid option", or > "skip option xid", or something else? > > Also the following place has similar issue: > + * the subscription if hte user has specified skip_xid. Once we start > skipping Fixed. I've attached an updated patch. All comments I got so far were incorporated into this patch unless I'm missing something. Regards, -- Masahiko Sawada EDB: https://www.enterprisedb.com/
From 9faf874a7388368f86c500e1fef9616ecf86e5b5 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada <sawada.mshk@gmail.com> Date: Fri, 10 Dec 2021 14:41:30 +0900 Subject: [PATCH v7] Add ALTER SUBSCRIPTION ... SKIP to skip the transaction on subscriber nodes If incoming change violates any constraint, logical replication stops until it's resolved. This commit introduces another way to skip the transaction in question, other than manually updating the subscriber's database or using pg_replication_origin_advance(). The user can specify XID by ALTER SUBSCRIPTION ... SKIP (xid = XXX), updating pg_subscription.subskipxid field, telling the apply worker to skip the transaction. The apply worker skips all data modification changes within the specified transaction. After skipping the transaction the apply worker clears pg_subscription.subskipxid. --- doc/src/sgml/catalogs.sgml | 10 + doc/src/sgml/logical-replication.sgml | 49 +++- doc/src/sgml/ref/alter_subscription.sgml | 42 ++++ src/backend/catalog/pg_subscription.c | 1 + src/backend/commands/subscriptioncmds.c | 53 +++++ src/backend/parser/gram.y | 9 + src/backend/replication/logical/worker.c | 265 ++++++++++++++++++++- src/bin/pg_dump/pg_dump.c | 4 + src/bin/psql/describe.c | 10 +- src/bin/psql/tab-complete.c | 8 +- src/include/catalog/pg_subscription.h | 4 + src/include/nodes/parsenodes.h | 3 +- src/test/regress/expected/subscription.out | 124 ++++++---- src/test/regress/sql/subscription.sql | 17 ++ src/test/subscription/t/028_skip_xact.pl | 217 +++++++++++++++++ 15 files changed, 755 insertions(+), 61 deletions(-) create mode 100644 src/test/subscription/t/028_skip_xact.pl diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index 2aeb2ef346..16f429b853 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -7746,6 +7746,16 @@ SCRAM-SHA-256$<replaceable><iteration count></replaceable>:<replaceable>&l </para></entry> </row> + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>subskipxid</structfield> <type>xid</type> + </para> + <para> + ID of the transaction whose changes are to be skipped, if a valid + transaction ID; otherwise 0. + </para></entry> + </row> + <row> <entry role="catalog_table_entry"><para role="column_definition"> <structfield>subconninfo</structfield> <type>text</type> diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index 96b4886e08..de4f83bbcc 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -353,15 +353,58 @@ <para> The resolution can be done either by changing data or permissions on the subscriber so - that it does not conflict with the incoming change or by skipping the - transaction that conflicts with the existing data. The transaction can be - skipped by calling the <link linkend="pg-replication-origin-advance"> + that it does not conflict with the incoming changes or by skipping the + the transaction that conflicts with the existing data. When a conflict + produces an error, it is shown in + <structname>pg_stat_subscription_workers</structname> view as follows: + </para> + + <programlisting> +postgres=# SELECT * FROM pg_stat_subscription_workers; +-[ RECORD 1 ]------+----------------------------------------------------------- +subid | 16391 +subname | test_sub +subrelid | +last_error_relid | 16385 +last_error_command | INSERT +last_error_xid | 716 +last_error_count | 50 +last_error_message | duplicate key value violates unique constraint "test_pkey" +last_error_time | 2021-09-29 15:52:45.165754+00 +</programlisting> + + <para> + and it is also shown in subscriber's server log as follows: + </para> + +<screen> +ERROR: duplicate key value violates unique constraint "test_pkey" +DETAIL: Key (id)=(1) already exists. +CONTEXT: processing remote data during "INSERT" for replication target relation "public.test" in transaction 716 at 2021-09-29 15:52:45.165754+00 +</screen> + + <para> + The transaction ID that contains the change violating the constraint can be + found from those outputs (transaction ID 716 in the above case). The transaction + can be skipped by using <command>ALTER SUBSCRIPTION ... SKIP</command> on the + subscription. Alternatively, the transaction can also be skipped by calling the + <link linkend="pg-replication-origin-advance"> <function>pg_replication_origin_advance()</function></link> function with a <parameter>node_name</parameter> corresponding to the subscription name, and a position. The current position of origins can be seen in the <link linkend="view-pg-replication-origin-status"> <structname>pg_replication_origin_status</structname></link> system view. </para> + + <para> + To resolve conflicts, you need to consider changing the data on the subscriber so + that it doesn't conflict with incoming changes, or dropping the conflicting constraint + or unique index, or writing a trigger on the subscriber to suppress or redirect + conflicting incoming changes, or as a last resort, by skipping the whole transaction. + Skipping the whole transaction includes skipping changes that may not violate + any constraint. This can easily make the subscriber inconsistent, especially if + a user specifies the wrong transaction ID or the position of origin. + </para> </sect1> <sect1 id="logical-replication-restrictions"> diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index 0b027cc346..8b4568ddab 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -29,6 +29,7 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> REFRESH PUB ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> ENABLE ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> DISABLE ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SET ( <replaceable class="parameter">subscription_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) +ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SKIP ( <replaceable class="parameter">skip_option</replaceable> = <replaceable class="parameter">value</replaceable> ) ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> OWNER TO { <replaceable>new_owner</replaceable> | CURRENT_ROLE | CURRENT_USER | SESSION_USER } ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <replaceable>new_name</replaceable> </synopsis> @@ -207,6 +208,47 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO < </listitem> </varlistentry> + <varlistentry> + <term><literal>SKIP ( <replaceable class="parameter">skip_option</replaceable> = <replaceable class="parameter">value</replaceable> [, ... ] )</literal></term> + <listitem> + <para> + Skips applying all changes of the specified transaction. If incoming data + violates any constraints, the logical replication will stop until it is + resolved. The resolution can be done either by changing data on the + subscriber so that it doesn't conflict with incoming changes or by skipping + the whole transaction. Using <command> ALTER SUBSCRIPTION ... SKIP </command> + command, the logical replication worker skips all data modification changes + within the specified transaction including changes that may not violate + the constraint, so, it should only be used as a last resort. This option has + no effect on the transactions that are already prepared by enabling + <literal>two_phase</literal> on subscriber. After the logical replication + successfully skips the transaction, the transaction ID (stored in + <structname>pg_subscription</structname>.<structfield>subskipxid</structfield>) + is cleared. See <xref linkend="logical-replication-conflicts"/> for + the details of logical replication conflicts. + </para> + + <para> + <replaceable>skip_option</replaceable> specifies options for this operation. + The supported option is: + + <variablelist> + <varlistentry> + <term><literal>xid</literal> (<type>xid</type>)</term> + <listitem> + <para> + Specifies the ID of the transaction whose changes are to be skipped + by the logical replication worker. We don't support skipping + individual subtransactions. Setting <literal>NONE</literal> + resets the transaction ID. + </para> + </listitem> + </varlistentry> + </variablelist> + </para> + </listitem> + </varlistentry> + <varlistentry> <term><replaceable class="parameter">new_owner</replaceable></term> <listitem> diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index ca65a8bd20..da199e9a3e 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -69,6 +69,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->binary = subform->subbinary; sub->stream = subform->substream; sub->twophasestate = subform->subtwophasestate; + sub->skipxid = subform->subskipxid; /* Get conninfo */ datum = SysCacheGetAttr(SUBSCRIPTIONOID, diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index f5eba450ce..0ff0e00f19 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -61,6 +61,7 @@ #define SUBOPT_BINARY 0x00000080 #define SUBOPT_STREAMING 0x00000100 #define SUBOPT_TWOPHASE_COMMIT 0x00000200 +#define SUBOPT_XID 0x00000400 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -82,6 +83,8 @@ typedef struct SubOpts bool binary; bool streaming; bool twophase; + TransactionId xid; /* InvalidTransactionId for resetting purpose, otherwise + * normal transaction id */ } SubOpts; static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); @@ -249,6 +252,33 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->specified_opts |= SUBOPT_TWOPHASE_COMMIT; opts->twophase = defGetBoolean(defel); } + else if (IsSet(supported_opts, SUBOPT_XID) && + strcmp(defel->defname, "xid") == 0) + { + char *xid_str = defGetString(defel); + TransactionId xid; + + if (IsSet(opts->specified_opts, SUBOPT_XID)) + errorConflictingDefElem(defel, pstate); + + /* Setting xid = NONE is treated as resetting xid */ + if (strcmp(xid_str, "none") == 0) + xid = InvalidTransactionId; + else + { + /* Parse the argument as TransactionId */ + xid = DatumGetTransactionId(DirectFunctionCall1(xidin, + CStringGetDatum(xid_str))); + + if (!TransactionIdIsNormal(xid)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid transaction id: %s", xid_str))); + } + + opts->specified_opts |= SUBOPT_XID; + opts->xid = xid; + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -464,6 +494,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, CharGetDatum(opts.twophase ? LOGICALREP_TWOPHASE_STATE_PENDING : LOGICALREP_TWOPHASE_STATE_DISABLED); + values[Anum_pg_subscription_subskipxid - 1] = + TransactionIdGetDatum(InvalidTransactionId); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); if (opts.slot_name) @@ -1083,6 +1115,27 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, break; } + case ALTER_SUBSCRIPTION_SKIP: + { + if (!superuser()) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("must be superuser to skip transaction"))); + + parse_subscription_options(pstate, stmt->options, SUBOPT_XID, &opts); + + /* ALTER SUBSCRIPTION ... SKIP supports only xid option */ + Assert(IsSet(opts.specified_opts, SUBOPT_XID)); + + values[Anum_pg_subscription_subskipxid - 1] = + TransactionIdGetDatum(opts.xid); + replaces[Anum_pg_subscription_subskipxid - 1] = true; + + update_tuple = true; + + break; + } + default: elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d", stmt->kind); diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index bb015a8bbd..0a0961dbb5 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -9954,6 +9954,15 @@ AlterSubscriptionStmt: (Node *)makeInteger(false), @1)); $$ = (Node *)n; } + | ALTER SUBSCRIPTION name SKIP definition + { + AlterSubscriptionStmt *n = + makeNode(AlterSubscriptionStmt); + n->kind = ALTER_SUBSCRIPTION_SKIP; + n->subname = $3; + n->options = $5; + $$ = (Node *)n; + } ; /***************************************************************************** diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index c9af775bc1..0264e30112 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -136,6 +136,7 @@ #include "access/xact.h" #include "access/xlog_internal.h" #include "catalog/catalog.h" +#include "catalog/indexing.h" #include "catalog/namespace.h" #include "catalog/partition.h" #include "catalog/pg_inherits.h" @@ -257,6 +258,21 @@ static bool in_streamed_transaction = false; static TransactionId stream_xid = InvalidTransactionId; +/* + * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for + * the subscription if the user has specified subskipxid. Once we start skipping + * changes, we don't stop it until we skip all changes of the transaction even + * if pg_subscription is updated and MySubscription->skipxid gets changed or + * reset during that. Also, in streaming transaction cases, we don't skip + * receiving and spooling the changes, since we decide whether or not to skip + * applying the changes when starting to apply changes. At end of the transaction, + * we disable it and reset subskipxid. The timing of resetting subskipxid varies + * depending on commit or commit/rollback prepared case. Please refer to the + * comments in corresponding functions for details. + */ +static TransactionId skip_xid = InvalidTransactionId; +#define is_skipping_changes() (TransactionIdIsValid(skip_xid)) + /* BufFile handle of the current streaming file */ static BufFile *stream_fd = NULL; @@ -332,6 +348,13 @@ static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int /* Common streaming function to apply all the spooled messages */ static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn); +/* Functions for skipping changes */ +static void maybe_start_skipping_changes(TransactionId xid); +static void stop_skipping_changes(bool commit, XLogRecPtr origin_lsn, + TimestampTz origin_timestamp); +static void clear_subscription_skip_xid(TransactionId xid, XLogRecPtr origin_lsn, + TimestampTz origin_timestamp); + /* Functions for apply error callback */ static void apply_error_callback(void *arg); static inline void set_apply_error_context_xact(TransactionId xid, TimestampTz ts); @@ -791,6 +814,11 @@ apply_handle_begin(StringInfo s) remote_final_lsn = begin_data.final_lsn; + /* + * Enable skipping all changes of this transaction if specified. + */ + maybe_start_skipping_changes(begin_data.xid); + in_remote_transaction = true; pgstat_report_activity(STATE_RUNNING, NULL); @@ -843,6 +871,11 @@ apply_handle_begin_prepare(StringInfo s) remote_final_lsn = begin_data.prepare_lsn; + /* + * Enable skipping all changes of this transaction if specified + */ + maybe_start_skipping_changes(begin_data.xid); + in_remote_transaction = true; pgstat_report_activity(STATE_RUNNING, NULL); @@ -856,6 +889,36 @@ apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data) { char gid[GIDSIZE]; + /* + * If we are skipping all changes of this transaction, we stop it but + * unlike commit, we do not clear subskipxid of pg_subscription catalog + * here and will do that at commit prepared or rollback prepared time. If + * we update the catalog and then prepare the transaction, the changes + * will be part of the prepared transaction. Even if we do that in + * reverse order, subskipxid will not be cleared but this transaction + * won't be resent in a case where the server crashes between them. + * + * subskipxid might be changed or cleared by the user before we receive + * COMMIT PREPARED or ROLLBACK PREPARED. But that's okay because this + * prepared transaction is empty. + * + * One might think that we can skip preparing the skipped transaction and + * also skip COMMIT PREPARED or ROLLBACK PREPARED by comparing the XID + * received as part of the message to subskipxid. But subskipxid could be + * changed by users between PREPARE and COMMIT PREPARED or ROLLBACK + * PREPARED. There was an idea to disallow users to change subskipxid + * while skipping changes. But we don't know when COMMIT PREPARED or + * ROLLBACK PREPARED comes and another conflict could occur in the + * meanwhile. If such another conflict occurs, we cannot skip the + * transaction by using subskipxid. Also, there was another idea to check + * whether the transaction has been prepared or not by checking GID, + * origin LSN, and origin timestamp of the prepared transaction but that + * doesn't seem worthwhile because it requires protocol changes, and + * skipping transactions shouldn't be common. + */ + if (is_skipping_changes()) + stop_skipping_changes(false, InvalidXLogRecPtr, 0); + /* * Compute unique GID for two_phase transactions. We don't use GID of * prepared transaction sent by server as that can lead to deadlock when @@ -901,9 +964,9 @@ apply_handle_prepare(StringInfo s) /* * Unlike commit, here, we always prepare the transaction even though no - * change has happened in this transaction. It is done this way because at - * commit prepared time, we won't know whether we have skipped preparing a - * transaction because of no change. + * change has happened in this transaction or all changes are skipped. It + * is done this way because at commit prepared time, we won't know whether + * we have skipped preparing a transaction because of no change. * * XXX, We can optimize such that at commit prepared time, we first check * whether we have prepared the transaction or not but that doesn't seem @@ -940,6 +1003,23 @@ apply_handle_commit_prepared(StringInfo s) logicalrep_read_commit_prepared(s, &prepare_data); set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_time); + if (MySubscription->skipxid == prepare_data.xid) + { + /* + * Clear the subskipxid of pg_subscription catalog. This catalog + * update must be committed before finishing prepared transaction. + * Because otherwise, in a case where the server crashes between + * finishing prepared transaction and the catalog update, COMMIT + * PREPARED won't be resent but subskipxid is left. + * + * Also, we must not update the replication origin LSN and timestamp + * while committing the catalog update so that COMMIT PREPARED will be + * resent in case of a crash immediately after the catalog update + * commit. + */ + clear_subscription_skip_xid(prepare_data.xid, InvalidXLogRecPtr, 0); + } + /* Compute GID for two_phase transactions. */ TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid, gid, sizeof(gid)); @@ -981,6 +1061,17 @@ apply_handle_rollback_prepared(StringInfo s) logicalrep_read_rollback_prepared(s, &rollback_data); set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_time); + if (MySubscription->skipxid == rollback_data.xid) + { + /* + * Same as COMMIT PREPARED, we must clear subskipxid of + * pg_subscription before rolling back the prepared transaction. + * Please see the comments in apply_handle_commit_prepared() for + * details. + */ + clear_subscription_skip_xid(rollback_data.xid, InvalidXLogRecPtr, 0); + } + /* Compute GID for two_phase transactions. */ TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid, gid, sizeof(gid)); @@ -1209,6 +1300,13 @@ apply_handle_stream_abort(StringInfo s) logicalrep_read_stream_abort(s, &xid, &subxid); + /* + * We don't expect the user to set the XID of the transaction that is + * rolled back but if the skip XID is set, clear it. + */ + if (MySubscription->skipxid == xid || MySubscription->skipxid == subxid) + clear_subscription_skip_xid(MySubscription->skipxid, InvalidXLogRecPtr, 0); + /* * If the two XIDs are the same, it's in fact abort of toplevel xact, so * just delete the files with serialized info. @@ -1331,6 +1429,9 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn) remote_final_lsn = lsn; + /* Enable skipping all changes of this transaction if specified */ + maybe_start_skipping_changes(xid); + /* * Make sure the handle apply_dispatch methods are aware we're in a remote * transaction. @@ -1451,7 +1552,23 @@ apply_handle_stream_commit(StringInfo s) static void apply_handle_commit_internal(LogicalRepCommitData *commit_data) { - if (IsTransactionState()) + if (is_skipping_changes()) + { + /* + * If we are skipping all changes of this transaction, we stop it and + * clear subskipxid of pg_subscription. + */ + stop_skipping_changes(true, commit_data->end_lsn, + commit_data->committime); + + /* Clearing subskipxid must be committed */ + Assert(!IsTransactionState()); + + pgstat_report_stat(false); + + store_flush_position(commit_data->end_lsn); + } + else if (IsTransactionState()) { /* * Update origin state so we can restart streaming from correct @@ -2366,6 +2483,17 @@ apply_dispatch(StringInfo s) LogicalRepMsgType action = pq_getmsgbyte(s); LogicalRepMsgType saved_command; + /* + * Skip all data-modification changes if we're skipping changes of this + * transaction. + */ + if (is_skipping_changes() && + (action == LOGICAL_REP_MSG_INSERT || + action == LOGICAL_REP_MSG_UPDATE || + action == LOGICAL_REP_MSG_DELETE || + action == LOGICAL_REP_MSG_TRUNCATE)) + return; + /* * Set the current command being applied. Since this function can be * called recusively when applying spooled changes, save the current @@ -3661,6 +3789,135 @@ IsLogicalWorker(void) return MyLogicalRepWorker != NULL; } +/* + * Start skipping changes of the transaction if the given XID matches the + * transaction ID specified by subskipxid. + */ +static void +maybe_start_skipping_changes(TransactionId xid) +{ + Assert(!is_skipping_changes()); + Assert(!in_remote_transaction); + Assert(!in_streamed_transaction); + + if (MySubscription->skipxid != xid) + return; + + /* Start skipping all changes of this transaction */ + skip_xid = xid; + + ereport(LOG, + errmsg("start skipping logical replication transaction %u", + xid)); +} + +/* + * Stop skipping changes by resetting subskipxid. If clear_subskipxid is true, + * we also clear subskipxid of pg_subscription by setting InvalidTransactionId. + */ +static void +stop_skipping_changes(bool clear_subskipxid, XLogRecPtr origin_lsn, + TimestampTz origin_timestamp) +{ + Assert(is_skipping_changes()); + + ereport(LOG, + (errmsg("done skipping logical replication transaction %u", + skip_xid))); + + if (clear_subskipxid) + { + clear_subscription_skip_xid(skip_xid, origin_lsn, origin_timestamp); + + /* Make sure that clearing subskipxid is committed */ + if (IsTransactionState()) + CommitTransactionCommand(); + } + + /* Stop skipping changes */ + skip_xid = InvalidTransactionId; +} + +/* Clear subskipxid of pg_subscription catalog */ +static void +clear_subscription_skip_xid(TransactionId xid, XLogRecPtr origin_lsn, + TimestampTz origin_timestamp) +{ + Relation rel; + Form_pg_subscription subform; + HeapTuple tup; + bool started_tx = false; + + if (!IsTransactionState()) + { + StartTransactionCommand(); + started_tx = true; + } + + /* + * Protect subskipxid of pg_subscription from being concurrently updated + * while clearing it. + */ + LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0, + AccessShareLock); + + rel = table_open(SubscriptionRelationId, RowExclusiveLock); + + /* Fetch the existing tuple. */ + tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, + ObjectIdGetDatum(MySubscription->oid)); + + if (!HeapTupleIsValid(tup)) + elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name); + + /* Get subskipxid value */ + subform = (Form_pg_subscription) GETSTRUCT(tup); + + /* + * Update the subskipxid of the tuple to InvalidTransactionId. If user + * has already changed subskipxid before clearing it we don't update the + * catalog and don't advance the replication origin state. So in the + * worst case, if the server crashes before sending an acknowledgment of + * the flush position the transaction will be sent again and the user + * needs to be set subskipxid again. We can reduce the possibility by + * logging a replication origin WAL record to advance the origin LSN + * instead but there is no way to advance origin timestamp and it + * doesn't seem to be worth it since it's a very minor case. + */ + if (subform->subskipxid == xid) + { + bool nulls[Natts_pg_subscription]; + bool replaces[Natts_pg_subscription]; + Datum values[Natts_pg_subscription]; + + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + memset(replaces, false, sizeof(replaces)); + + /* reset subskipxid */ + values[Anum_pg_subscription_subskipxid - 1] = + TransactionIdGetDatum(InvalidTransactionId); + replaces[Anum_pg_subscription_subskipxid - 1] = true; + + /* + * Update origin state so we can restart streaming from correct + * position in case of crash. + */ + replorigin_session_origin_lsn = origin_lsn; + replorigin_session_origin_timestamp = origin_timestamp; + + tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, + replaces); + CatalogTupleUpdate(rel, &tup->t_self, tup); + } + + heap_freetuple(tup); + table_close(rel, NoLock); + + if (started_tx) + CommitTransactionCommand(); +} + /* Error callback to give more context info about the change being applied */ static void apply_error_callback(void *arg) diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 92ab95724d..29aea5b56b 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -4301,6 +4301,10 @@ getSubscriptions(Archive *fout) ntups = PQntuples(res); + /* + * Get subscription fields. We don't fetch subskipxid as we don't + * include it in the dump. + */ i_tableoid = PQfnumber(res, "tableoid"); i_oid = PQfnumber(res, "oid"); i_subname = PQfnumber(res, "subname"); diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 8587b19160..9cd478025d 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -6029,7 +6029,7 @@ describeSubscriptions(const char *pattern, bool verbose) PGresult *res; printQueryOpt myopt = pset.popt; static const bool translate_columns[] = {false, false, false, false, - false, false, false, false, false}; + false, false, false, false, false, false}; if (pset.sversion < 100000) { @@ -6063,11 +6063,13 @@ describeSubscriptions(const char *pattern, bool verbose) gettext_noop("Binary"), gettext_noop("Streaming")); - /* Two_phase is only supported in v15 and higher */ + /* Two_phase and skip XID are only supported in v15 and higher */ if (pset.sversion >= 150000) appendPQExpBuffer(&buf, - ", subtwophasestate AS \"%s\"\n", - gettext_noop("Two phase commit")); + ", subtwophasestate AS \"%s\"\n" + ", subskipxid AS \"%s\"\n", + gettext_noop("Two phase commit"), + gettext_noop("Skip XID")); appendPQExpBuffer(&buf, ", subsynccommit AS \"%s\"\n" diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index 6bd33a06cb..b5689ec609 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -1710,7 +1710,7 @@ psql_completion(const char *text, int start, int end) /* ALTER SUBSCRIPTION <name> */ else if (Matches("ALTER", "SUBSCRIPTION", MatchAny)) COMPLETE_WITH("CONNECTION", "ENABLE", "DISABLE", "OWNER TO", - "RENAME TO", "REFRESH PUBLICATION", "SET", + "RENAME TO", "REFRESH PUBLICATION", "SET", "SKIP", "ADD PUBLICATION", "DROP PUBLICATION"); /* ALTER SUBSCRIPTION <name> REFRESH PUBLICATION */ else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && @@ -1726,6 +1726,12 @@ psql_completion(const char *text, int start, int end) /* ALTER SUBSCRIPTION <name> SET ( */ else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "(")) COMPLETE_WITH("binary", "slot_name", "streaming", "synchronous_commit"); + /* ALTER SUBSCRIPTION <name> SKIP */ + else if (Matches("ALTER", "SUBSCRIPTION", MatchAny, "SKIP")) + COMPLETE_WITH("("); + /* ALTER SUBSCRIPTION <name> SKIP ( */ + else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SKIP", "(")) + COMPLETE_WITH("xid"); /* ALTER SUBSCRIPTION <name> SET PUBLICATION */ else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "PUBLICATION")) { diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 18c291289f..d4410da58f 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -67,6 +67,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW char subtwophasestate; /* Stream two-phase transactions */ + TransactionId subskipxid; /* All changes associated with + * this XID are skipped */ + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ text subconninfo BKI_FORCE_NOT_NULL; @@ -103,6 +106,7 @@ typedef struct Subscription * binary format */ bool stream; /* Allow streaming in-progress transactions. */ char twophasestate; /* Allow streaming two-phase transactions */ + TransactionId skipxid; /* All changes of the XID are skipped */ char *conninfo; /* Connection string to the publisher */ char *slotname; /* Name of the replication slot */ char *synccommit; /* Synchronous commit setting for worker */ diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 413e7c85a1..ab3554f234 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -3716,7 +3716,8 @@ typedef enum AlterSubscriptionType ALTER_SUBSCRIPTION_ADD_PUBLICATION, ALTER_SUBSCRIPTION_DROP_PUBLICATION, ALTER_SUBSCRIPTION_REFRESH, - ALTER_SUBSCRIPTION_ENABLED + ALTER_SUBSCRIPTION_ENABLED, + ALTER_SUBSCRIPTION_SKIP } AlterSubscriptionType; typedef struct AlterSubscriptionStmt diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index 80aae83562..892b6739bc 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -76,10 +76,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Skip XID | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+----------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | 0 | off | dbname=regress_doesnotexist (1 row) ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false); @@ -93,11 +93,39 @@ ALTER SUBSCRIPTION regress_doesnotexist CONNECTION 'dbname=regress_doesnotexist2 ERROR: subscription "regress_doesnotexist" does not exist ALTER SUBSCRIPTION regress_testsub SET (create_slot = false); ERROR: unrecognized subscription parameter: "create_slot" +-- ok - valid xid +ALTER SUBSCRIPTION regress_testsub SKIP (xid = 3); +ALTER SUBSCRIPTION regress_testsub SKIP (xid = 4294967295); +SELECT subname, subskipxid FROM pg_subscription WHERE subname = 'regress_testsub'; + subname | subskipxid +-----------------+------------ + regress_testsub | 4294967295 +(1 row) + +ALTER SUBSCRIPTION regress_testsub SKIP (xid = NONE); +SELECT subname, subskipxid FROM pg_subscription WHERE subname = 'regress_testsub'; + subname | subskipxid +-----------------+------------ + regress_testsub | 0 +(1 row) + +-- fail +ALTER SUBSCRIPTION regress_testsub SKIP (xid = 0); +ERROR: invalid transaction id: 0 +ALTER SUBSCRIPTION regress_testsub SKIP (xid = 1); +ERROR: invalid transaction id: 1 +ALTER SUBSCRIPTION regress_testsub SKIP (xid = 2); +ERROR: invalid transaction id: 2 +-- fail - must be superuser +SET SESSION AUTHORIZATION 'regress_subscription_user2'; +ALTER SUBSCRIPTION regress_testsub SKIP (xid = 100); +ERROR: must be owner of subscription regress_testsub +SET SESSION AUTHORIZATION 'regress_subscription_user'; \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+--------------------+------------------------------ - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | off | dbname=regress_doesnotexist2 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Skip XID | Synchronous commit | Conninfo +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+----------+--------------------+------------------------------ + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | 0 | off | dbname=regress_doesnotexist2 (1 row) BEGIN; @@ -129,10 +157,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar); ERROR: invalid value for parameter "synchronous_commit": "foobar" HINT: Available values: local, remote_write, remote_apply, on, off. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo ----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+--------------------+------------------------------ - regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | local | dbname=regress_doesnotexist2 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Skip XID | Synchronous commit | Conninfo +---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+----------+--------------------+------------------------------ + regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | 0 | local | dbname=regress_doesnotexist2 (1 row) -- rename back to keep the rest simple @@ -165,19 +193,19 @@ ERROR: binary requires a Boolean value CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = true); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | t | f | d | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Skip XID | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+----------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | t | f | d | 0 | off | dbname=regress_doesnotexist (1 row) ALTER SUBSCRIPTION regress_testsub SET (binary = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Skip XID | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+----------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | 0 | off | dbname=regress_doesnotexist (1 row) DROP SUBSCRIPTION regress_testsub; @@ -188,19 +216,19 @@ ERROR: streaming requires a Boolean value CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | t | d | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Skip XID | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+----------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | t | d | 0 | off | dbname=regress_doesnotexist (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Skip XID | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+----------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | 0 | off | dbname=regress_doesnotexist (1 row) -- fail - publication already exists @@ -215,10 +243,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false); ERROR: publication "testpub1" is already in subscription "regress_testsub" \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo ------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | f | d | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Skip XID | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+----------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | f | d | 0 | off | dbname=regress_doesnotexist (1 row) -- fail - publication used more then once @@ -233,10 +261,10 @@ ERROR: publication "testpub3" is not in subscription "regress_testsub" -- ok - delete publications ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Skip XID | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+----------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | 0 | off | dbname=regress_doesnotexist (1 row) DROP SUBSCRIPTION regress_testsub; @@ -270,10 +298,10 @@ ERROR: two_phase requires a Boolean value CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | p | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Skip XID | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+----------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | p | 0 | off | dbname=regress_doesnotexist (1 row) --fail - alter of two_phase option not supported. @@ -282,10 +310,10 @@ ERROR: unrecognized subscription parameter: "two_phase" -- but can alter streaming when two_phase enabled ALTER SUBSCRIPTION regress_testsub SET (streaming = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Skip XID | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+----------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | 0 | off | dbname=regress_doesnotexist (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -294,10 +322,10 @@ DROP SUBSCRIPTION regress_testsub; CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true, two_phase = true); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Skip XID | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+----------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | 0 | off | dbname=regress_doesnotexist (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index bd0f4af1e4..aa15d12d9d 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -72,6 +72,23 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = ''); ALTER SUBSCRIPTION regress_doesnotexist CONNECTION 'dbname=regress_doesnotexist2'; ALTER SUBSCRIPTION regress_testsub SET (create_slot = false); +-- ok - valid xid +ALTER SUBSCRIPTION regress_testsub SKIP (xid = 3); +ALTER SUBSCRIPTION regress_testsub SKIP (xid = 4294967295); +SELECT subname, subskipxid FROM pg_subscription WHERE subname = 'regress_testsub'; +ALTER SUBSCRIPTION regress_testsub SKIP (xid = NONE); +SELECT subname, subskipxid FROM pg_subscription WHERE subname = 'regress_testsub'; + +-- fail +ALTER SUBSCRIPTION regress_testsub SKIP (xid = 0); +ALTER SUBSCRIPTION regress_testsub SKIP (xid = 1); +ALTER SUBSCRIPTION regress_testsub SKIP (xid = 2); + +-- fail - must be superuser +SET SESSION AUTHORIZATION 'regress_subscription_user2'; +ALTER SUBSCRIPTION regress_testsub SKIP (xid = 100); +SET SESSION AUTHORIZATION 'regress_subscription_user'; + \dRs+ BEGIN; diff --git a/src/test/subscription/t/028_skip_xact.pl b/src/test/subscription/t/028_skip_xact.pl new file mode 100644 index 0000000000..4c107fc8f5 --- /dev/null +++ b/src/test/subscription/t/028_skip_xact.pl @@ -0,0 +1,217 @@ + +# Copyright (c) 2021, PostgreSQL Global Development Group + +# Tests for skipping logical replication transactions +use strict; +use warnings; +use PostgreSQL::Test::Cluster; +use Test::More tests => 7; + +# Test skipping the transaction. This function must be called after the caller +# inserting data that conflict with the subscriber. After waiting for the +# subscription worker stats are updated, we skip the transaction in question +# by ALTER SUBSCRIPTION ... SKIP. Then, check if logical replication can continue +# working by inserting $nonconflict_data on the publisher. +sub test_skip_xact +{ + my ($node_publisher, $node_subscriber, $subname, $relname, $nonconflict_data, + $expected, $xid, $msg) = @_; + + # Wait for worker error + $node_subscriber->poll_query_until( + 'postgres', + qq[ +SELECT count(1) > 0 +FROM pg_stat_subscription_workers +WHERE last_error_relid = '$relname'::regclass + AND subrelid IS NULL + AND last_error_command = 'INSERT' + AND last_error_xid = '$xid' + AND starts_with(last_error_message, 'duplicate key value violates unique constraint'); +]) or die "Timed out while waiting for worker error"; + + # Set skip xid + $node_subscriber->safe_psql( + 'postgres', + "ALTER SUBSCRIPTION $subname SKIP (xid = '$xid')"); + + # Restart the subscriber node to restart logical replication with no interval + $node_subscriber->restart; + + # Wait for the failed transaction to be skipped + $node_subscriber->poll_query_until( + 'postgres', + "SELECT subskipxid = 0 FROM pg_subscription WHERE subname = '$subname'"); + + # Insert non-conflict data + $node_publisher->safe_psql( + 'postgres', + "INSERT INTO $relname VALUES $nonconflict_data"); + + $node_publisher->wait_for_catchup($subname); + + # Check replicated data + my $res = $node_subscriber->safe_psql( + 'postgres', + "SELECT count(*) FROM $relname"); + is($res, $expected, $msg); +} + +# Create publisher node +my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', + qq[ +logical_decoding_work_mem = 64kB +max_prepared_transactions = 10 +]); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->append_conf('postgresql.conf', + qq[ +max_prepared_transactions = 10 +]); + +# The subscriber will enter an infinite error loop, so we don't want +# to overflow the server log with error messages. +$node_subscriber->append_conf('postgresql.conf', + qq[ +wal_retrieve_retry_interval = 2s +]); +$node_subscriber->start; + +# Initial table setup on both publisher and subscriber. On subscriber we +# create the same tables but with primary keys. Also, insert some data that +# will conflict with the data replicated from publisher later. +$node_publisher->safe_psql( + 'postgres', + qq[ +BEGIN; +CREATE TABLE test_tab (a int); +CREATE TABLE test_tab_streaming (a int, b text); +COMMIT; +]); +$node_subscriber->safe_psql( + 'postgres', + qq[ +BEGIN; +CREATE TABLE test_tab (a int primary key); +CREATE TABLE test_tab_streaming (a int primary key, b text); +INSERT INTO test_tab VALUES (1); +INSERT INTO test_tab_streaming VALUES (1, md5(1::text)); +COMMIT; +]); + +# Setup publications +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql( + 'postgres', + qq[ +CREATE PUBLICATION tap_pub FOR TABLE test_tab; +CREATE PUBLICATION tap_pub_streaming FOR TABLE test_tab_streaming; +]); + +# Create subscriptions +$node_subscriber->safe_psql( + 'postgres', + qq[ +CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (two_phase = on); +CREATE SUBSCRIPTION tap_sub_streaming CONNECTION '$publisher_connstr' PUBLICATION tap_pub_streaming WITH (two_phase = on, streaming = on); +]); + +$node_publisher->wait_for_catchup('tap_sub'); +$node_publisher->wait_for_catchup('tap_sub_streaming'); + +# Insert data to test_tab1, raising an error on the subscriber due to violation +# of the unique constraint on test_tab. Then skip the transaction. +my $xid = $node_publisher->safe_psql( + 'postgres', + qq[ +BEGIN; +INSERT INTO test_tab VALUES (1); +SELECT pg_current_xact_id()::xid; +COMMIT; +]); +test_skip_xact($node_publisher, $node_subscriber, "tap_sub", "test_tab", + "(2)", "2", $xid, + "test skipping transaction"); + +# Test for PREPARE and COMMIT PREPARED. Insert the same data to test_tab1 and +# PREPARE the transaction, raising an error. Then skip the transaction. +$xid = $node_publisher->safe_psql( + 'postgres', + qq[ +BEGIN; +INSERT INTO test_tab VALUES (1); +SELECT pg_current_xact_id()::xid; +PREPARE TRANSACTION 'gtx'; +COMMIT PREPARED 'gtx'; +]); +test_skip_xact($node_publisher, $node_subscriber, "tap_sub", "test_tab", + "(3)", "3", $xid, + "test skipping prepare and commit prepared "); + +# Test for PREPARE and ROLLBACK PREPARED +$xid = $node_publisher->safe_psql( + 'postgres', + qq[ +BEGIN; +INSERT INTO test_tab VALUES (1); +SELECT pg_current_xact_id()::xid; +PREPARE TRANSACTION 'gtx'; +ROLLBACK PREPARED 'gtx'; +]); +test_skip_xact($node_publisher, $node_subscriber, "tap_sub", "test_tab", + "(4)", "4", $xid, + "test skipping prepare and rollback prepared"); + +# Test for STREAM COMMIT. Insert enough rows to test_tab_streaming to exceed the 64kB +# limit, also raising an error on the subscriber during applying spooled changes for the +# same reason. Then skip the transaction. +$xid = $node_publisher->safe_psql( + 'postgres', + qq[ +BEGIN; +INSERT INTO test_tab_streaming SELECT i, md5(i::text) FROM generate_series(1, 10000) s(i); +SELECT pg_current_xact_id()::xid; +COMMIT; +]); +test_skip_xact($node_publisher, $node_subscriber, "tap_sub_streaming", "test_tab_streaming", + "(2, md5(2::text))", "2", $xid, + "test skipping stream-commit"); + +# Test for STREAM PREPARE and COMMIT PREPARED +$xid = $node_publisher->safe_psql( + 'postgres', + qq[ +BEGIN; +INSERT INTO test_tab_streaming SELECT i, md5(i::text) FROM generate_series(1, 10000) s(i); +SELECT pg_current_xact_id()::xid; +PREPARE TRANSACTION 'gtx'; +COMMIT PREPARED 'gtx'; +]); +test_skip_xact($node_publisher, $node_subscriber, "tap_sub_streaming", "test_tab_streaming", + "(3, md5(3::text))", "3", $xid, + "test skipping stream-prepare and commit prepared"); + +# Test for STREAM PREPARE and ROLLBACK PREPARED +$xid = $node_publisher->safe_psql( + 'postgres', + qq[ +BEGIN; +INSERT INTO test_tab_streaming SELECT i, md5(i::text) FROM generate_series(1, 10000) s(i); +SELECT pg_current_xact_id()::xid; +PREPARE TRANSACTION 'gtx'; +ROLLBACK PREPARED 'gtx'; +]); +test_skip_xact($node_publisher, $node_subscriber, "tap_sub_streaming", "test_tab_streaming", + "(4, md5(4::text))", "4", $xid, + "test skipping stream-prepare and rollback prepared"); + +my $res = $node_subscriber->safe_psql( + 'postgres', + "SELECT count(*) FROM pg_prepared_xacts"); +is($res, "0", "check all prepared transactions are resolved on the subscriber"); -- 2.24.3 (Apple Git-128)