On Sat, Jul 17, 2021 at 12:02 AM Masahiko Sawada <sawada.m...@gmail.com> wrote: > > On Wed, Jul 14, 2021 at 5:14 PM Masahiko Sawada <sawada.m...@gmail.com> wrote: > > > > On Mon, Jul 12, 2021 at 8:52 PM Amit Kapila <amit.kapil...@gmail.com> wrote: > > > > > > On Mon, Jul 12, 2021 at 11:13 AM Masahiko Sawada <sawada.m...@gmail.com> > > > wrote: > > > > > > > > On Mon, Jul 12, 2021 at 1:15 PM Amit Kapila <amit.kapil...@gmail.com> > > > > wrote: > > > > > > > > > > On Mon, Jul 12, 2021 at 9:37 AM Alexey Lesovsky <lesov...@gmail.com> > > > > > wrote: > > > > > > > > > > > > On Mon, Jul 12, 2021 at 8:36 AM Amit Kapila > > > > > > <amit.kapil...@gmail.com> wrote: > > > > > >> > > > > > >> > > > > > > >> > Ok, looks nice. But I am curious how this will work in the case > > > > > >> > when there are two (or more) errors in the same subscription, > > > > > >> > but different relations? > > > > > >> > > > > > > >> > > > > > >> We can't proceed unless the first error is resolved, so there > > > > > >> shouldn't be multiple unresolved errors. > > > > > > > > > > > > > > > > > > Ok. I thought multiple errors are possible when many tables are > > > > > > initialized using parallel workers (with > > > > > > max_sync_workers_per_subscription > 1). > > > > > > > > > > > > > > > > Yeah, that is possible but that covers under the second condition > > > > > mentioned by me and in such cases I think we should have separate rows > > > > > for each tablesync. Is that right, Sawada-san or do you have something > > > > > else in mind? > > > > > > > > Yeah, I agree to have separate rows for each table sync. The table > > > > should not be processed by both the table sync worker and the apply > > > > worker at a time so the pair of subscription OID and relation OID will > > > > be unique. I think that we have a boolean column in the view, > > > > indicating whether the error entry is reported by the table sync > > > > worker or the apply worker, or maybe we also can have the action > > > > column show "TABLE SYNC" if the error is reported by the table sync > > > > worker. > > > > > > > > > > Or similar to backend_type (text) in pg_stat_activity, we can have > > > something like error_source (text) which will display apply worker or > > > tablesync worker? I think if we have this column then even if there is > > > a chance that both apply and sync worker operates on the same > > > relation, we can identify it via this column. > > > > Sounds good. I'll incorporate this in the next version patch that I'm > > planning to submit this week. > > Sorry, I could not make it this week. I'll submit them early next week. > While updating the patch I thought we need to have more design > discussion on two points of clearing error details after the error is > resolved: > > 1. How to clear apply worker errors. IIUC we've discussed that once > the apply worker skipped the transaction we leave the error entry > itself but clear its fields except for some fields such as failure > counts. But given that the stats messages could be lost, how can we > ensure to clear those error details? For table sync workers’ error, we > can have autovacuum workers periodically check entires of > pg_subscription_rel and clear the error entry if the table sync worker > completes table sync (i.g., checking if srsubstate = ‘r’). But there > is no such information for the apply workers and subscriptions. In > addition to sending the message clearing the error details just after > skipping the transaction, I thought that we can have apply workers > periodically send the message clearing the error details but it seems > not good.
I think that the motivation behind the idea of leaving error entries and clearing theirs some fields is that users can check if the error is successfully resolved and the worker is working find. But we can check it also in another way, for example, checking pg_stat_subscription view. So is it worth considering leaving the apply worker errors as they are? > > 2. Do we really want to leave the table sync worker even after the > error is resolved and the table sync completes? Unlike the apply > worker error, the number of table sync worker errors could be very > large, for example, if a subscriber subscribes to many tables. If we > leave those errors in the stats view, it uses more memory space and > could affect writing and reading stats file performance. If such left > table sync error entries are not helpful in practice I think we can > remove them rather than clear some fields. What do you think? > I've attached the updated version patch that incorporated all comments I got so far except for the clearing error details part I mentioned above. After getting a consensus on those parts, I'll incorporate the idea into the patches. Regards, -- Masahiko Sawada EDB: https://www.enterprisedb.com/
From 4a2abc82db9ab37699f09df9be86f150c58db3cf Mon Sep 17 00:00:00 2001 From: Masahiko Sawada <sawada.mshk@gmail.com> Date: Mon, 28 Jun 2021 13:18:58 +0900 Subject: [PATCH v2 3/3] Add skip_xid option to ALTER SUBSCRIPTION. --- doc/src/sgml/logical-replication.sgml | 33 ++- doc/src/sgml/ref/alter_subscription.sgml | 47 +++- src/backend/catalog/pg_subscription.c | 10 + src/backend/commands/subscriptioncmds.c | 138 +++++++++-- src/backend/parser/gram.y | 11 +- src/backend/replication/logical/worker.c | 252 +++++++++++++++++---- src/include/catalog/pg_subscription.h | 4 + src/include/nodes/parsenodes.h | 4 +- src/test/regress/expected/subscription.out | 24 ++ src/test/regress/sql/subscription.sql | 20 ++ src/test/subscription/t/023_skip_xact.pl | 185 +++++++++++++++ 11 files changed, 645 insertions(+), 83 deletions(-) create mode 100644 src/test/subscription/t/023_skip_xact.pl diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index 88646bc859..d222e64122 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -333,14 +333,41 @@ <para> A conflict will produce an error and will stop the replication; it must be resolved manually by the user. Details about the conflict can be found in - the subscriber's server log. + <link linkend="monitoring-pg-stat-subscription-errors"/> as well as the + subscriber's server log. </para> <para> The resolution can be done either by changing data on the subscriber so that it does not conflict with the incoming change or by skipping the - transaction that conflicts with the existing data. The transaction can be - skipped by calling the <link linkend="pg-replication-origin-advance"> + transaction that conflicts with the existing data. When a conflict produce + an error, it is shown in <structname>pg_stat_subscription_errors</structname> + view as follows: + </para> + + <programlisting> +postgres=# SELECT * FROM pg_stat_subscription_errors; + datname | subname | relid | command | xid | failure_source | failure_count | last_failure | last_failure_message +----------+----------+-------+---------+-----+----------------+---------------+-------------------------------+------------------------------------------------------------ + postgres | test_sub | 16385 | INSERT | 740 | apply | 1 | 2021-07-15 21:54:58.804595+00 | duplicate key value violates unique constraint "test_pkey" +</programlisting> + + <para> + and it is also shown in subscriber's server log as follows: + </para> + +<screen> +ERROR: duplicate key value violates unique constraint "test_pkey" +DETAIL: Key (id)=(1) already exists. +CONTEXT: during apply of "INSERT" for relation "public.test" in transaction with xid 740 committs 2021-07-15 21:54:58.802874+00 +</screen> + + <para> + The transaction ID to skip (740 in above cases) can be found in those outputs. + The transaction can be skipped by setting <replaceable>skip_xid</replaceable> to + the subscription by <command>ALTER SUBSCRIPTION ... SET</command>. + Alternatively, The transaction can also be skipped by calling the + <link linkend="pg-replication-origin-advance"> <function>pg_replication_origin_advance()</function></link> function with a <parameter>node_name</parameter> corresponding to the subscription name, and a position. The current position of origins can be seen in the diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index a6f994450d..e961f83eca 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -29,6 +29,7 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> REFRESH PUB ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> ENABLE ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> DISABLE ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SET ( <replaceable class="parameter">subscription_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) +ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RESET ( <replaceable class="parameter">subscription_parameter</replaceable> [, ... ] ) ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> OWNER TO { <replaceable>new_owner</replaceable> | CURRENT_ROLE | CURRENT_USER | SESSION_USER } ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <replaceable>new_name</replaceable> </synopsis> @@ -192,15 +193,47 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO < <varlistentry> <term><literal>SET ( <replaceable class="parameter">subscription_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] )</literal></term> + <term><literal>RESET ( <replaceable class="parameter">subscription_parameter</replaceable> [, ... ] )</literal></term> <listitem> <para> - This clause alters parameters originally set by - <xref linkend="sql-createsubscription"/>. See there for more - information. The parameters that can be altered - are <literal>slot_name</literal>, - <literal>synchronous_commit</literal>, - <literal>binary</literal>, and - <literal>streaming</literal>. + This clause sets or resets a subscription option. The parameters that can be + set are the parameters originally set by <xref linkend="sql-createsubscription"/>: + <literal>slot_name</literal>, <literal>synchronous_commit</literal>, + <literal>binary</literal>, <literal>streaming</literal>, and following + parameter: + </para> + <para> + <variablelist> + <varlistentry> + <term><literal>skip_xid</literal> (<type>xid</type>)</term> + <listitem> + <para> + If incoming data violates any constraints the logical replication + will stop until it is resolved (See + <xref linkend="logical-replication-conflicts"/> for the details). + The resolution can be done either by changing data on the + subscriber so that it doesn't conflict with incoming change or by + skipping the whole transaction. This option specifies transaction + ID that logical replication worker skips to apply. The logical + replication worker skips all data modification changes of the + specified transaction. Therefore, since it skips the whole + transaction including the changes that don't violate the constraint, + it should only be used as a last resort. This option has no effect + for the transaction that is already prepared with enabling + <literal>two_phase</literal> on susbscriber. After the logical + replication successfully skips the transaction, the transaction ID + (stored in + <structname>pg_subscription</structname>.<structfield>subskipxid</structfield>) + is cleared. + </para> + </listitem> + </varlistentry> + </variablelist> + </para> + <para> + The parameters that can be reset are: <literal>streaming</literal>, + <literal>binary</literal>, <literal>synchronous_commit</literal>, + and <literal>skip_xid</literal>. </para> </listitem> </varlistentry> diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 25021e25a4..cb22cd7463 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -104,6 +104,16 @@ GetSubscription(Oid subid, bool missing_ok) Assert(!isnull); sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum)); + /* Get skip XID */ + datum = SysCacheGetAttr(SUBSCRIPTIONOID, + tup, + Anum_pg_subscription_subskipxid, + &isnull); + if (!isnull) + sub->skipxid = DatumGetTransactionId(datum); + else + sub->skipxid = InvalidTransactionId; + ReleaseSysCache(tup); return sub; diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 239d263f83..b0a4b1de60 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -60,6 +60,7 @@ #define SUBOPT_BINARY 0x00000080 #define SUBOPT_STREAMING 0x00000100 #define SUBOPT_TWOPHASE_COMMIT 0x00000200 +#define SUBOPT_SKIP_XID 0x00000400 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -81,6 +82,7 @@ typedef struct SubOpts bool binary; bool streaming; bool twophase; + TransactionId skip_xid; } SubOpts; static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); @@ -99,7 +101,8 @@ static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, */ static void parse_subscription_options(ParseState *pstate, List *stmt_options, - bits32 supported_opts, SubOpts *opts) + bits32 supported_opts, SubOpts *opts, + bool is_reset) { ListCell *lc; @@ -128,12 +131,23 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->streaming = false; if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT)) opts->twophase = false; + if (IsSet(supported_opts, SUBOPT_SKIP_XID)) + opts->skip_xid = InvalidTransactionId; /* Parse options */ foreach(lc, stmt_options) { DefElem *defel = (DefElem *) lfirst(lc); + if (is_reset) + { + if (defel->arg != NULL) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("RESET must not include values for parameters"))); + + } + if (IsSet(supported_opts, SUBOPT_CONNECT) && strcmp(defel->defname, "connect") == 0) { @@ -141,7 +155,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, errorConflictingDefElem(defel, pstate); opts->specified_opts |= SUBOPT_CONNECT; - opts->connect = defGetBoolean(defel); + if (!is_reset) + opts->connect = defGetBoolean(defel); } else if (IsSet(supported_opts, SUBOPT_ENABLED) && strcmp(defel->defname, "enabled") == 0) @@ -150,7 +165,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, errorConflictingDefElem(defel, pstate); opts->specified_opts |= SUBOPT_ENABLED; - opts->enabled = defGetBoolean(defel); + if (!is_reset) + opts->enabled = defGetBoolean(defel); } else if (IsSet(supported_opts, SUBOPT_CREATE_SLOT) && strcmp(defel->defname, "create_slot") == 0) @@ -159,7 +175,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, errorConflictingDefElem(defel, pstate); opts->specified_opts |= SUBOPT_CREATE_SLOT; - opts->create_slot = defGetBoolean(defel); + if (!is_reset) + opts->create_slot = defGetBoolean(defel); } else if (IsSet(supported_opts, SUBOPT_SLOT_NAME) && strcmp(defel->defname, "slot_name") == 0) @@ -168,7 +185,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, errorConflictingDefElem(defel, pstate); opts->specified_opts |= SUBOPT_SLOT_NAME; - opts->slot_name = defGetString(defel); + if (!is_reset) + opts->slot_name = defGetString(defel); /* Setting slot_name = NONE is treated as no slot name. */ if (strcmp(opts->slot_name, "none") == 0) @@ -181,7 +199,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, errorConflictingDefElem(defel, pstate); opts->specified_opts |= SUBOPT_COPY_DATA; - opts->copy_data = defGetBoolean(defel); + if (!is_reset) + opts->copy_data = defGetBoolean(defel); } else if (IsSet(supported_opts, SUBOPT_SYNCHRONOUS_COMMIT) && strcmp(defel->defname, "synchronous_commit") == 0) @@ -190,12 +209,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, errorConflictingDefElem(defel, pstate); opts->specified_opts |= SUBOPT_SYNCHRONOUS_COMMIT; - opts->synchronous_commit = defGetString(defel); + if (!is_reset) + { + opts->synchronous_commit = defGetString(defel); - /* Test if the given value is valid for synchronous_commit GUC. */ - (void) set_config_option("synchronous_commit", opts->synchronous_commit, - PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET, - false, 0, false); + /* Test if the given value is valid for synchronous_commit GUC. */ + (void) set_config_option("synchronous_commit", opts->synchronous_commit, + PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET, + false, 0, false); + } } else if (IsSet(supported_opts, SUBOPT_REFRESH) && strcmp(defel->defname, "refresh") == 0) @@ -204,7 +226,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, errorConflictingDefElem(defel, pstate); opts->specified_opts |= SUBOPT_REFRESH; - opts->refresh = defGetBoolean(defel); + if (!is_reset) + opts->refresh = defGetBoolean(defel); } else if (IsSet(supported_opts, SUBOPT_BINARY) && strcmp(defel->defname, "binary") == 0) @@ -213,7 +236,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, errorConflictingDefElem(defel, pstate); opts->specified_opts |= SUBOPT_BINARY; - opts->binary = defGetBoolean(defel); + if (!is_reset) + opts->binary = defGetBoolean(defel); } else if (IsSet(supported_opts, SUBOPT_STREAMING) && strcmp(defel->defname, "streaming") == 0) @@ -222,7 +246,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, errorConflictingDefElem(defel, pstate); opts->specified_opts |= SUBOPT_STREAMING; - opts->streaming = defGetBoolean(defel); + if (!is_reset) + opts->streaming = defGetBoolean(defel); } else if (strcmp(defel->defname, "two_phase") == 0) { @@ -243,7 +268,26 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, errorConflictingDefElem(defel, pstate); opts->specified_opts |= SUBOPT_TWOPHASE_COMMIT; - opts->twophase = defGetBoolean(defel); + if (!is_reset) + opts->twophase = defGetBoolean(defel); + } + else if (strcmp(defel->defname, "skip_xid") == 0) + { + if (!is_reset) + { + int64 arg; + TransactionId xid; + + arg = defGetInt64(defel); + xid = (TransactionId) arg; + if (arg < 0 || !TransactionIdIsNormal(xid)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid transaction id"))); + opts->skip_xid = xid; + } + + opts->specified_opts |= SUBOPT_SKIP_XID; } else ereport(ERROR, @@ -414,7 +458,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA | SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT); - parse_subscription_options(pstate, stmt->options, supported_opts, &opts); + parse_subscription_options(pstate, stmt->options, supported_opts, &opts, + false); /* * Since creating a replication slot is not transactional, rolling back @@ -487,6 +532,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, CharGetDatum(opts.twophase ? LOGICALREP_TWOPHASE_STATE_PENDING : LOGICALREP_TWOPHASE_STATE_DISABLED); + nulls[Anum_pg_subscription_subskipxid - 1] = true; values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); if (opts.slot_name) @@ -883,14 +929,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, switch (stmt->kind) { - case ALTER_SUBSCRIPTION_OPTIONS: + case ALTER_SUBSCRIPTION_SET_OPTIONS: { supported_opts = (SUBOPT_SLOT_NAME | SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | - SUBOPT_STREAMING); + SUBOPT_STREAMING | SUBOPT_SKIP_XID); parse_subscription_options(pstate, stmt->options, - supported_opts, &opts); + supported_opts, &opts, false); if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME)) { @@ -935,14 +981,60 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, replaces[Anum_pg_subscription_substream - 1] = true; } + if (IsSet(opts.specified_opts, SUBOPT_SKIP_XID)) + { + values[Anum_pg_subscription_subskipxid - 1] = + TransactionIdGetDatum(opts.skip_xid); + replaces[Anum_pg_subscription_subskipxid - 1] = true; + } + update_tuple = true; break; } + case ALTER_SUBSCRIPTION_RESET_OPTIONS: + { + supported_opts = (SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_STREAMING | + SUBOPT_BINARY | SUBOPT_SKIP_XID); + + parse_subscription_options(pstate, stmt->options, + supported_opts, &opts, true); + + if (IsSet(opts.specified_opts, SUBOPT_SYNCHRONOUS_COMMIT)) + { + values[Anum_pg_subscription_subsynccommit - 1] = + CStringGetTextDatum("off"); + replaces[Anum_pg_subscription_subsynccommit - 1] = true; + } + + if (IsSet(opts.specified_opts, SUBOPT_STREAMING)) + { + values[Anum_pg_subscription_substream - 1] = + BoolGetDatum(false); + replaces[Anum_pg_subscription_substream - 1] = true; + } + + if (IsSet(opts.specified_opts, SUBOPT_BINARY)) + { + values[Anum_pg_subscription_subbinary - 1] = + BoolGetDatum(false); + replaces[Anum_pg_subscription_subbinary - 1] = true; + } + + if (IsSet(opts.specified_opts, SUBOPT_SKIP_XID)) + { + nulls[Anum_pg_subscription_subskipxid - 1] = + replaces[Anum_pg_subscription_subskipxid - 1] = true; + } + + update_tuple = true; + break; + } case ALTER_SUBSCRIPTION_ENABLED: { parse_subscription_options(pstate, stmt->options, - SUBOPT_ENABLED, &opts); + SUBOPT_ENABLED, &opts, false); + Assert(IsSet(opts.specified_opts, SUBOPT_ENABLED)); if (!sub->slotname && opts.enabled) @@ -977,7 +1069,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, { supported_opts = SUBOPT_COPY_DATA | SUBOPT_REFRESH; parse_subscription_options(pstate, stmt->options, - supported_opts, &opts); + supported_opts, &opts, false); values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(stmt->publication); @@ -1027,7 +1119,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, supported_opts |= SUBOPT_COPY_DATA; parse_subscription_options(pstate, stmt->options, - supported_opts, &opts); + supported_opts, &opts, false); publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname); values[Anum_pg_subscription_subpublications - 1] = @@ -1075,7 +1167,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions"))); parse_subscription_options(pstate, stmt->options, - SUBOPT_COPY_DATA, &opts); + SUBOPT_COPY_DATA, &opts, false); /* * The subscription option "two_phase" requires that diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 10da5c5c51..41a1d333f6 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -9699,7 +9699,16 @@ AlterSubscriptionStmt: { AlterSubscriptionStmt *n = makeNode(AlterSubscriptionStmt); - n->kind = ALTER_SUBSCRIPTION_OPTIONS; + n->kind = ALTER_SUBSCRIPTION_SET_OPTIONS; + n->subname = $3; + n->options = $5; + $$ = (Node *)n; + } + | ALTER SUBSCRIPTION name RESET definition + { + AlterSubscriptionStmt *n = + makeNode(AlterSubscriptionStmt); + n->kind = ALTER_SUBSCRIPTION_RESET_OPTIONS; n->subname = $3; n->options = $5; $$ = (Node *)n; diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 7c2ec983bb..e09929206f 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -136,6 +136,7 @@ #include "access/xact.h" #include "access/xlog_internal.h" #include "catalog/catalog.h" +#include "catalog/indexing.h" #include "catalog/namespace.h" #include "catalog/partition.h" #include "catalog/pg_inherits.h" @@ -277,6 +278,16 @@ static bool in_streamed_transaction = false; static TransactionId stream_xid = InvalidTransactionId; +/* + * True if we're skipping changes of the specified transaction in + * MySubscription->skip_xid. Please note that we don’t skip receiving the changes + * since we decide whether or not to skip applying the changes when starting to + * apply. When stopping the skipping behavior, we reset the skip XID (subskipxid) + * in the pg_subscription and associate origin status to the transaction that resets + * the skip XID so that we can start streaming from the next transaction. + */ +static bool skipping_changes = false; + /* * Hash table for storing the streaming xid information along with shared file * set for streaming and subxact files. @@ -332,8 +343,7 @@ static void maybe_reread_subscription(void); /* prototype needed because of stream_commit */ static void apply_dispatch(StringInfo s); -static void apply_handle_commit_internal(StringInfo s, - LogicalRepCommitData *commit_data); +static void apply_handle_commit_internal(LogicalRepCommitData *commit_data); static void apply_handle_insert_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, TupleTableSlot *remoteslot); @@ -361,6 +371,9 @@ static void set_apply_error_context_rel(LogicalRepRelMapEntry *rel); static void reset_apply_error_context_rel(void); static void reset_apply_error_context_info(void); +static void maybe_start_skipping_changes(TransactionId xid); +static void stop_skipping_changes(XLogRecPtr origin_lsn, TimestampTz origin_committs); + /* * Should this worker apply changes for given relation. * @@ -858,6 +871,9 @@ apply_handle_begin(StringInfo s) remote_final_lsn = begin_data.final_lsn; + /* Start skipping all changes of this transaction if specified */ + maybe_start_skipping_changes(begin_data.xid); + in_remote_transaction = true; pgstat_report_activity(STATE_RUNNING, NULL); @@ -882,7 +898,18 @@ apply_handle_commit(StringInfo s) LSN_FORMAT_ARGS(commit_data.commit_lsn), LSN_FORMAT_ARGS(remote_final_lsn)))); - apply_handle_commit_internal(s, &commit_data); + /* + * Stop the skipping transaction if enabled. Otherwise, commit the + * changes that are just applied. + */ + if (skipping_changes) + { + stop_skipping_changes(commit_data.end_lsn, commit_data.committime); + store_flush_position(commit_data.end_lsn); + in_remote_transaction = false; + } + else + apply_handle_commit_internal(&commit_data); /* Process any tables that are being synchronized in parallel. */ process_syncing_tables(commit_data.end_lsn); @@ -910,6 +937,9 @@ apply_handle_begin_prepare(StringInfo s) remote_final_lsn = begin_data.prepare_lsn; + /* Start skipping all changes of this transaction if specified */ + maybe_start_skipping_changes(begin_data.xid); + in_remote_transaction = true; pgstat_report_activity(STATE_RUNNING, NULL); @@ -936,47 +966,55 @@ apply_handle_prepare(StringInfo s) LSN_FORMAT_ARGS(remote_final_lsn)))); /* - * Compute unique GID for two_phase transactions. We don't use GID of - * prepared transaction sent by server as that can lead to deadlock when - * we have multiple subscriptions from same node point to publications on - * the same node. See comments atop worker.c + * Prepare transaction if we haven't skipped the changes of this transaction. */ - TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid, - gid, sizeof(gid)); + if (skipping_changes) + stop_skipping_changes(prepare_data.end_lsn, prepare_data.prepare_time); + else + { + /* + * Compute unique GID for two_phase transactions. We don't use GID of + * prepared transaction sent by server as that can lead to deadlock when + * we have multiple subscriptions from same node point to publications on + * the same node. See comments atop worker.c + */ + TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid, + gid, sizeof(gid)); - /* - * Unlike commit, here, we always prepare the transaction even though no - * change has happened in this transaction. It is done this way because at - * commit prepared time, we won't know whether we have skipped preparing a - * transaction because of no change. - * - * XXX, We can optimize such that at commit prepared time, we first check - * whether we have prepared the transaction or not but that doesn't seem - * worthwhile because such cases shouldn't be common. - */ - begin_replication_step(); + /* + * Unlike commit, here, we always prepare the transaction even though no + * change has happened in this transaction. It is done this way because at + * commit prepared time, we won't know whether we have skipped preparing a + * transaction because of no change. + * + * XXX, We can optimize such that at commit prepared time, we first check + * whether we have prepared the transaction or not but that doesn't seem + * worthwhile because such cases shouldn't be common. + */ + begin_replication_step(); - /* - * BeginTransactionBlock is necessary to balance the EndTransactionBlock - * called within the PrepareTransactionBlock below. - */ - BeginTransactionBlock(); - CommitTransactionCommand(); /* Completes the preceding Begin command. */ + /* + * BeginTransactionBlock is necessary to balance the EndTransactionBlock + * called within the PrepareTransactionBlock below. + */ + BeginTransactionBlock(); + CommitTransactionCommand(); /* Completes the preceding Begin command. */ - /* - * Update origin state so we can restart streaming from correct position - * in case of crash. - */ - replorigin_session_origin_lsn = prepare_data.end_lsn; - replorigin_session_origin_timestamp = prepare_data.prepare_time; + /* + * Update origin state so we can restart streaming from correct position + * in case of crash. + */ + replorigin_session_origin_lsn = prepare_data.end_lsn; + replorigin_session_origin_timestamp = prepare_data.prepare_time; - PrepareTransactionBlock(gid); - end_replication_step(); - CommitTransactionCommand(); - pgstat_report_stat(false); + PrepareTransactionBlock(gid); + end_replication_step(); + CommitTransactionCommand(); + pgstat_report_stat(false); - store_flush_position(prepare_data.end_lsn); + } + store_flush_position(prepare_data.end_lsn); in_remote_transaction = false; /* Process any tables that are being synchronized in parallel. */ @@ -1089,9 +1127,10 @@ apply_handle_origin(StringInfo s) { /* * ORIGIN message can only come inside streaming transaction or inside - * remote transaction and before any actual writes. + * remote transaction and before any actual writes unless we're skipping + * changes of the transaction. */ - if (!in_streamed_transaction && + if (!in_streamed_transaction && !skipping_changes && (!in_remote_transaction || (IsTransactionState() && !am_tablesync_worker()))) ereport(ERROR, @@ -1113,6 +1152,9 @@ apply_handle_stream_start(StringInfo s) (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg_internal("duplicate STREAM START message"))); + /* extract XID of the top-level transaction */ + stream_xid = logicalrep_read_stream_start(s, &first_segment); + /* * Start a transaction on stream start, this transaction will be committed * on the stream stop unless it is a tablesync worker in which case it @@ -1125,9 +1167,6 @@ apply_handle_stream_start(StringInfo s) /* notify handle methods we're processing a remote transaction */ in_streamed_transaction = true; - /* extract XID of the top-level transaction */ - stream_xid = logicalrep_read_stream_start(s, &first_segment); - if (!TransactionIdIsValid(stream_xid)) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), @@ -1209,6 +1248,7 @@ apply_handle_stream_abort(StringInfo s) errmsg_internal("STREAM ABORT message without STREAM STOP"))); logicalrep_read_stream_abort(s, &xid, &subxid); + maybe_start_skipping_changes(xid); /* * If the two XIDs are the same, it's in fact abort of toplevel xact, so @@ -1301,6 +1341,10 @@ apply_handle_stream_abort(StringInfo s) CommitTransactionCommand(); } + /* Stop the skipping transaction if enabled */ + if (skipping_changes) + stop_skipping_changes(InvalidXLogRecPtr, 0); + reset_apply_error_context_info(); } @@ -1311,11 +1355,11 @@ static void apply_handle_stream_commit(StringInfo s) { TransactionId xid; + LogicalRepCommitData commit_data; StringInfoData s2; int nchanges; char path[MAXPGPATH]; char *buffer = NULL; - LogicalRepCommitData commit_data; StreamXidHash *ent; MemoryContext oldcxt; BufFile *fd; @@ -1329,8 +1373,13 @@ apply_handle_stream_commit(StringInfo s) apply_error_callback_arg.remote_xid = xid; apply_error_callback_arg.committs = commit_data.committime; + remote_final_lsn = commit_data.commit_lsn; + elog(DEBUG1, "received commit for streamed transaction %u", xid); + /* Start skipping all changes of this transaction if specified */ + maybe_start_skipping_changes(xid); + /* Make sure we have an open transaction */ begin_replication_step(); @@ -1362,13 +1411,12 @@ apply_handle_stream_commit(StringInfo s) MemoryContextSwitchTo(oldcxt); - remote_final_lsn = commit_data.commit_lsn; - /* * Make sure the handle apply_dispatch methods are aware we're in a remote * transaction. */ in_remote_transaction = true; + pgstat_report_activity(STATE_RUNNING, NULL); end_replication_step(); @@ -1441,7 +1489,17 @@ apply_handle_stream_commit(StringInfo s) elog(DEBUG1, "replayed %d (all) changes from file \"%s\"", nchanges, path); - apply_handle_commit_internal(s, &commit_data); + if (skipping_changes) + { + stop_skipping_changes(commit_data.end_lsn, commit_data.committime); + store_flush_position(commit_data.end_lsn); + in_remote_transaction = false; + } + else + { + /* commit the streamed transaction */ + apply_handle_commit_internal(&commit_data); + } /* unlink the files with serialized changes and subxact info */ stream_cleanup_files(MyLogicalRepWorker->subid, xid); @@ -1450,7 +1508,6 @@ apply_handle_stream_commit(StringInfo s) process_syncing_tables(commit_data.end_lsn); pgstat_report_activity(STATE_IDLE, NULL); - reset_apply_error_context_info(); } @@ -1458,7 +1515,7 @@ apply_handle_stream_commit(StringInfo s) * Helper function for apply_handle_commit and apply_handle_stream_commit. */ static void -apply_handle_commit_internal(StringInfo s, LogicalRepCommitData *commit_data) +apply_handle_commit_internal(LogicalRepCommitData *commit_data) { if (IsTransactionState()) { @@ -2330,6 +2387,17 @@ apply_dispatch(StringInfo s) LogicalRepMsgType action = pq_getmsgbyte(s); ErrorContextCallback errcallback; + /* + * Skip all data-modification changes if we're skipping changes of this + * transaction. + */ + if (skipping_changes && + (action == LOGICAL_REP_MSG_INSERT || + action == LOGICAL_REP_MSG_UPDATE || + action == LOGICAL_REP_MSG_DELETE || + action == LOGICAL_REP_MSG_TRUNCATE)) + return; + /* * Push apply error context callback. Other fields will be filled * during applying the change. @@ -3789,3 +3857,91 @@ reset_logicalrep_error_context_rel(void) apply_error_callback_arg.relname = NULL; } } + +/* + * Start skipping changes of the transaction if the given XID matches the + * transaction ID specified by skip_xid option. + */ +static void +maybe_start_skipping_changes(TransactionId xid) +{ + Assert(!skipping_changes); + Assert(!in_remote_transaction); + Assert(!in_streamed_transaction); + + if (!TransactionIdIsValid(MySubscription->skipxid) || + MySubscription->skipxid != xid) + return; + + skipping_changes = true; + ereport(LOG, + errmsg("start skipping logical replication transaction with xid %u", + xid)); +} + +/* + * Stop skipping changes and reset the skip XID. + * + * If origin_lsn and origin_committs are valid, we set origin state to the + * transaction commit that resets the skip XID so that we can start streaming + * from the transaction next to the one that we just skipped. + */ +static void +stop_skipping_changes(XLogRecPtr origin_lsn, TimestampTz origin_committs) +{ + Relation rel; + HeapTuple tup; + bool nulls[Natts_pg_subscription]; + bool replaces[Natts_pg_subscription]; + Datum values[Natts_pg_subscription]; + + Assert(skipping_changes); + Assert(TransactionIdIsValid(MySubscription->skipxid)); + Assert(in_remote_transaction); + + /* Stop skipping changes */ + skipping_changes = false; + ereport(LOG, + errmsg("done skipping logical replication transaction with xid %u", + MySubscription->skipxid)); + + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + memset(replaces, false, sizeof(replaces)); + + /* Update the system catalog to reset the skip XID */ + if (!IsTransactionState()) + StartTransactionCommand(); + rel = table_open(SubscriptionRelationId, RowExclusiveLock); + + /* Fetch the existing tuple. */ + tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, + TransactionIdGetDatum(MySubscription->oid)); + + if (!HeapTupleIsValid(tup)) + elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name); + + /* Set subskipxid to null */ + nulls[Anum_pg_subscription_subskipxid - 1] = true; + replaces[Anum_pg_subscription_subskipxid - 1] = true; + + tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, + replaces); + CatalogTupleUpdate(rel, &tup->t_self, tup); + + heap_freetuple(tup); + table_close(rel, RowExclusiveLock); + + if (!XLogRecPtrIsInvalid(origin_lsn)) + { + /* + * Update origin state so we can restart streaming from correct + * position in case of crash. + */ + replorigin_session_origin_lsn = origin_lsn; + replorigin_session_origin_timestamp = origin_committs; + } + + CommitTransactionCommand(); + pgstat_report_stat(false); +} diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 21061493ea..e5a95a02ec 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -67,6 +67,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW char subtwophasestate; /* Stream two-phase transactions */ + TransactionId subskipxid BKI_FORCE_NULL; /* All changes associated with + this XID are skipped */ + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ text subconninfo BKI_FORCE_NOT_NULL; @@ -103,6 +106,7 @@ typedef struct Subscription * binary format */ bool stream; /* Allow streaming in-progress transactions. */ char twophasestate; /* Allow streaming two-phase transactions */ + TransactionId skipxid; /* All changes of the XID are skipped */ char *conninfo; /* Connection string to the publisher */ char *slotname; /* Name of the replication slot */ char *synccommit; /* Synchronous commit setting for worker */ diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index def9651b34..2c6d321284 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -3658,7 +3658,8 @@ typedef struct CreateSubscriptionStmt typedef enum AlterSubscriptionType { - ALTER_SUBSCRIPTION_OPTIONS, + ALTER_SUBSCRIPTION_SET_OPTIONS, + ALTER_SUBSCRIPTION_RESET_OPTIONS, ALTER_SUBSCRIPTION_CONNECTION, ALTER_SUBSCRIPTION_SET_PUBLICATION, ALTER_SUBSCRIPTION_ADD_PUBLICATION, @@ -3675,6 +3676,7 @@ typedef struct AlterSubscriptionStmt char *conninfo; /* Connection string to publisher */ List *publication; /* One or more publication to subscribe to */ List *options; /* List of DefElem nodes */ + TransactionId skip_xid; /* XID to skip */ } AlterSubscriptionStmt; typedef struct DropSubscriptionStmt diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index ad6b4e4bd3..11c9da4162 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -283,6 +283,30 @@ ERROR: unrecognized subscription parameter: "two_phase" ALTER SUBSCRIPTION regress_testsub SET (streaming = true); ERROR: cannot set streaming = true for two-phase enabled subscription ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +-- it works +ALTER SUBSCRIPTION regress_testsub SET (skip_xid = 3); +ALTER SUBSCRIPTION regress_testsub SET (skip_xid = 4294967295); +ALTER SUBSCRIPTION regress_testsub RESET (skip_xid, synchronous_commit, binary, streaming); +ALTER SUBSCRIPTION regress_testsub RESET (skip_xid); +-- fail - invalid XID +ALTER SUBSCRIPTION regress_testsub SET (skip_xid = 4294967296); +ERROR: invalid transaction id +ALTER SUBSCRIPTION regress_testsub SET (skip_xid = -1); +ERROR: invalid transaction id +ALTER SUBSCRIPTION regress_testsub SET (skip_xid = 0); +ERROR: invalid transaction id +ALTER SUBSCRIPTION regress_testsub SET (skip_xid = 1); +ERROR: invalid transaction id +ALTER SUBSCRIPTION regress_testsub SET (skip_xid = 2); +ERROR: invalid transaction id +-- fail - RESET unsupporting +ALTER SUBSCRIPTION regress_testsub RESET (connect); +ERROR: unrecognized subscription parameter: "connect" +ALTER SUBSCRIPTION regress_testsub RESET (enabled); +ERROR: unrecognized subscription parameter: "enabled" +-- fail - RESET must not include values +ALTER SUBSCRIPTION regress_testsub RESET (synchronous_commit = off); +ERROR: RESET must not include values for parameters \dRs+ List of subscriptions Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index b732871407..1db0a6d22f 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -217,6 +217,26 @@ ALTER SUBSCRIPTION regress_testsub SET (streaming = true); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +-- it works +ALTER SUBSCRIPTION regress_testsub SET (skip_xid = 3); +ALTER SUBSCRIPTION regress_testsub SET (skip_xid = 4294967295); +ALTER SUBSCRIPTION regress_testsub RESET (skip_xid, synchronous_commit, binary, streaming); +ALTER SUBSCRIPTION regress_testsub RESET (skip_xid); + +-- fail - invalid XID +ALTER SUBSCRIPTION regress_testsub SET (skip_xid = 4294967296); +ALTER SUBSCRIPTION regress_testsub SET (skip_xid = -1); +ALTER SUBSCRIPTION regress_testsub SET (skip_xid = 0); +ALTER SUBSCRIPTION regress_testsub SET (skip_xid = 1); +ALTER SUBSCRIPTION regress_testsub SET (skip_xid = 2); + +-- fail - RESET unsupporting +ALTER SUBSCRIPTION regress_testsub RESET (connect); +ALTER SUBSCRIPTION regress_testsub RESET (enabled); + +-- fail - RESET must not include values +ALTER SUBSCRIPTION regress_testsub RESET (synchronous_commit = off); + \dRs+ DROP SUBSCRIPTION regress_testsub; diff --git a/src/test/subscription/t/023_skip_xact.pl b/src/test/subscription/t/023_skip_xact.pl new file mode 100644 index 0000000000..7b29828cce --- /dev/null +++ b/src/test/subscription/t/023_skip_xact.pl @@ -0,0 +1,185 @@ + +# Copyright (c) 2021, PostgreSQL Global Development Group + +# Test skipping logical replication transactions +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 8; + +sub test_subscription_error +{ + my ($node, $expected, $source, $relname, $msg) = @_; + + # Wait for the error statistics to be updated. + $node->poll_query_until( + 'postgres', qq[ +SELECT count(1) > 0 FROM pg_stat_subscription_errors +WHERE relid = '$relname'::regclass AND failure_source = '$source'; +]) or die "Timed out while waiting for statistics to be updated"; + + my $result = $node->safe_psql( + 'postgres', + qq[ +SELECT datname, subname, command, relid::regclass, failure_source, failure_count > 0 +FROM pg_stat_subscription_errors +WHERE relid = '$relname'::regclass AND failure_source = '$source'; +]); + is($result, $expected, $msg); +} + +# Create publisher node. +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', + 'logical_decoding_work_mem = 64kB'); +$node_publisher->start; + +# Create subscriber node. +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); + +# don't overflow the server log with error messages. +$node_subscriber->append_conf('postgresql.conf', + 'wal_retrieve_retry_interval = 5s'); +$node_subscriber->start; + +# Initial table setup on both publisher and subscriber. On subscriber we create +# the same tables but with primary keys. Also, insert some data that will conflict +# with the data replicated from publisher later. +$node_publisher->safe_psql('postgres', + q[ +BEGIN; +CREATE TABLE test_tab1 (a int); +CREATE TABLE test_tab2 (a int); +CREATE TABLE test_tab_streaming (a int, b text); +INSERT INTO test_tab1 VALUES (1); +INSERT INTO test_tab2 VALUES (1); +COMMIT; +]); +$node_subscriber->safe_psql('postgres', + q[ +BEGIN; +CREATE TABLE test_tab1 (a int primary key); +CREATE TABLE test_tab2 (a int primary key); +CREATE TABLE test_tab_streaming (a int primary key, b text); +INSERT INTO test_tab2 VALUES (1); +INSERT INTO test_tab_streaming SELECT 10000, md5(10000::text); +COMMIT; +]); + +# Check if there is no subscription errors before starting logical replication. +my $result = + $node_subscriber->safe_psql('postgres', + "SELECT count(1) FROM pg_stat_subscription_errors"); +is($result, qq(0), 'check no subscription error'); + +# Setup logical replication. +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + q[ +CREATE PUBLICATION tap_pub FOR TABLE test_tab1, test_tab2; +CREATE PUBLICATION tap_pub_streaming FOR TABLE test_tab_streaming; +]); + +# Start logical replication. The table sync for test_tab2 on tap_sub will enter +# infinite error due to violating the unique constraint. +my $appname = 'tap_sub'; +$node_subscriber->safe_psql( + 'postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (two_phase = on);"); +my $appname_streaming = 'tap_sub_streaming'; +$node_subscriber->safe_psql( + 'postgres', + "CREATE SUBSCRIPTION tap_sub_streaming CONNECTION '$publisher_connstr application_name=$appname_streaming' PUBLICATION tap_pub_streaming WITH (streaming = on);"); + + +$node_publisher->wait_for_catchup($appname); +$node_publisher->wait_for_catchup($appname_streaming); + +# Also wait for initial table sync for test_tab1 and test_tab_streaming to finish. +$node_subscriber->poll_query_until('postgres', + q[ +SELECT count(1) = 2 FROM pg_subscription_rel +WHERE srrelid in ('test_tab1'::regclass, 'test_tab_streaming'::regclass) AND srsubstate = 'r' +]) or die "Timed out while waiting for subscriber to synchronize data"; + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(a) FROM test_tab1"); +is($result, q(1), 'check initial data was copied to subscriber'); + +# Insert more data to test_tab1, raising an error on the subscriber due to violating +# the unique constraint on test_tab1. +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab1 VALUES (1)"); + +# Insert enough rows to test_tab_streaming to exceed the 64kB limit, also raising an +# error on the subscriber for the same reason. +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab_streaming SELECT i, md5(i::text) FROM generate_series(1, 10000) s(i);"); + +# Check both two errors on tap_sub subscription are reported. +test_subscription_error($node_subscriber, qq(postgres|tap_sub|INSERT|test_tab1|apply|t), + 'apply', 'test_tab1', 'error reporting by the apply worker'); +test_subscription_error($node_subscriber, qq(postgres|tap_sub||test_tab2|tablesync|t), + 'tablesync', 'test_tab2', 'error reporting by the table sync worker'); +test_subscription_error($node_subscriber, qq(postgres|tap_sub_streaming|INSERT|test_tab_streaming|apply|t), + 'apply', 'test_tab_streaming', 'error reporting by the apply worker'); + +# Set XIDs of the transactions in question to the subscriptions to skip. +my $skip_xid1 = $node_subscriber->safe_psql( + 'postgres', + "SELECT xid FROM pg_stat_subscription_errors WHERE relid = 'test_tab1'::regclass"); +my $skip_xid2 = $node_subscriber->safe_psql( + 'postgres', + "SELECT xid FROM pg_stat_subscription_errors WHERE relid = 'test_tab_streaming'::regclass"); + +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub SET (skip_xid = $skip_xid1)"); +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub_streaming SET (skip_xid = $skip_xid2)"); + +# Restart the subscriber to restart logical replication without interval. +$node_subscriber->restart; + +# Wait for the transaction in question is skipped. +$node_subscriber->poll_query_until( + 'postgres', + q[ +SELECT count(1) = 2 FROM pg_subscription +WHERE subname in ('tap_sub', 'tap_sub_streaming') AND subskipxid IS NULL +]) or die "Timed out while waiting for the transaction to be skipped"; + +# Insert data to test_tab1 that doesn't conflict. +$node_publisher->safe_psql( + 'postgres', + "INSERT INTO test_tab1 VALUES (2)"); + +# Also, insert data to test_tab_streaming that doesn't conflict. +$node_publisher->safe_psql( + 'postgres', + "INSERT INTO test_tab_streaming VALUES (10001, md5(10001::text))"); + +$node_publisher->wait_for_catchup($appname); +$node_publisher->wait_for_catchup($appname_streaming); + +# Check the data is successfully replicated after skipping the transaction. +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM test_tab1"); +is($result, q(1 +2), "subscription gets changes after skipped transaction"); +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(1) FROM test_tab_streaming"); +is($result, q(2), "subscription gets changes after skipped transaction"); + +# Check if the view doesn't show any entries after dropping the subscription. +$node_subscriber->safe_psql( + 'postgres', + q[ +DROP SUBSCRIPTION tap_sub; +DROP SUBSCRIPTION tap_sub_streaming; +]); +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(1) FROM pg_stat_subscription_errors"); +is($result, q(0), 'no error after dropping subscription'); -- 2.24.3 (Apple Git-128)
From 8578720819ea56aed4993bc926402b67179868de Mon Sep 17 00:00:00 2001 From: Masahiko Sawada <sawada.mshk@gmail.com> Date: Mon, 28 Jun 2021 13:21:58 +0900 Subject: [PATCH v2 1/3] Add errcontext to errors of the applying logical replication changes. --- src/backend/commands/tablecmds.c | 7 + src/backend/replication/logical/proto.c | 49 +++++ src/backend/replication/logical/worker.c | 220 ++++++++++++++++++++--- src/include/replication/logicalproto.h | 1 + src/include/replication/logicalworker.h | 2 + 5 files changed, 257 insertions(+), 22 deletions(-) diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 46b108caa6..4662ec4787 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -78,6 +78,7 @@ #include "partitioning/partbounds.h" #include "partitioning/partdesc.h" #include "pgstat.h" +#include "replication/logicalworker.h" #include "rewrite/rewriteDefine.h" #include "rewrite/rewriteHandler.h" #include "rewrite/rewriteManip.h" @@ -1897,6 +1898,9 @@ ExecuteTruncateGuts(List *explicit_rels, continue; } + /* Set logical replication error callback info if necessary */ + set_logicalrep_error_context_rel(rel); + /* * Build the lists of foreign tables belonging to each foreign server * and pass each list to the foreign data wrapper's callback function, @@ -2004,6 +2008,9 @@ ExecuteTruncateGuts(List *explicit_rels, pgstat_count_truncate(rel); } + /* Reset logical replication error callback info */ + reset_logicalrep_error_context_rel(); + /* Now go through the hash table, and truncate foreign tables */ if (ft_htab) { diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 13c8c3bd5b..833a97aec9 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -1109,3 +1109,52 @@ logicalrep_read_stream_abort(StringInfo in, TransactionId *xid, *xid = pq_getmsgint(in, 4); *subxid = pq_getmsgint(in, 4); } + +/* + * get string representing LogicalRepMsgType. + */ +char * +logicalrep_message_type(LogicalRepMsgType action) +{ + switch (action) + { + case LOGICAL_REP_MSG_BEGIN: + return "BEGIN"; + case LOGICAL_REP_MSG_COMMIT: + return "COMMIT"; + case LOGICAL_REP_MSG_INSERT: + return "INSERT"; + case LOGICAL_REP_MSG_UPDATE: + return "UPDATE"; + case LOGICAL_REP_MSG_DELETE: + return "DELETE"; + case LOGICAL_REP_MSG_TRUNCATE: + return "TRUNCATE"; + case LOGICAL_REP_MSG_RELATION: + return "RELATION"; + case LOGICAL_REP_MSG_TYPE: + return "TYPE"; + case LOGICAL_REP_MSG_ORIGIN: + return "ORIGIN"; + case LOGICAL_REP_MSG_MESSAGE: + return "MESSAGE"; + case LOGICAL_REP_MSG_STREAM_START: + return "STREAM START"; + case LOGICAL_REP_MSG_STREAM_END: + return "STREAM END"; + case LOGICAL_REP_MSG_STREAM_ABORT: + return "STREAM ABORT"; + case LOGICAL_REP_MSG_STREAM_COMMIT: + return "STREAM COMMIT"; + case LOGICAL_REP_MSG_BEGIN_PREPARE: + return "BEGIN PREPARE"; + case LOGICAL_REP_MSG_PREPARE: + return "PREPARE"; + case LOGICAL_REP_MSG_COMMIT_PREPARED: + return "COMMIT PREPARED"; + case LOGICAL_REP_MSG_ROLLBACK_PREPARED: + return "ROLLBACK PREPARED"; + } + + elog(ERROR, "invalid logical replication message type \"%c\"", action); +} diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index b9a7a7ffbb..c23713468c 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -221,6 +221,27 @@ typedef struct ApplyExecutionData PartitionTupleRouting *proute; /* partition routing info */ } ApplyExecutionData; +/* Struct for saving and restoring apply information */ +typedef struct ApplyErrCallbackArg +{ + LogicalRepMsgType command; /* 0 if invalid */ + + /* Local relation information */ + char *nspname; /* used for error context */ + char *relname; /* used for error context */ + + TransactionId remote_xid; + TimestampTz committs; +} ApplyErrCallbackArg; +static ApplyErrCallbackArg apply_error_callback_arg = +{ + .command = 0, + .relname = NULL, + .nspname = NULL, + .remote_xid = InvalidTransactionId, + .committs = 0, +}; + /* * Stream xid hash entry. Whenever we see a new xid we create this entry in the * xidhash and along with it create the streaming file and store the fileset handle. @@ -333,6 +354,10 @@ static void apply_handle_tuple_routing(ApplyExecutionData *edata, /* Compute GID for two_phase transactions */ static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid); +static void apply_error_callback(void *arg); +static void set_apply_error_context_rel(LogicalRepRelMapEntry *rel); +static void reset_apply_error_context_rel(void); +static void reset_apply_error_context_info(void); /* * Should this worker apply changes for given relation. @@ -826,6 +851,8 @@ apply_handle_begin(StringInfo s) LogicalRepBeginData begin_data; logicalrep_read_begin(s, &begin_data); + apply_error_callback_arg.remote_xid = begin_data.xid; + apply_error_callback_arg.committs = begin_data.committime; remote_final_lsn = begin_data.final_lsn; @@ -859,6 +886,7 @@ apply_handle_commit(StringInfo s) process_syncing_tables(commit_data.end_lsn); pgstat_report_activity(STATE_IDLE, NULL); + reset_apply_error_context_info(); } /* @@ -876,6 +904,7 @@ apply_handle_begin_prepare(StringInfo s) errmsg_internal("tablesync worker received a BEGIN PREPARE message"))); logicalrep_read_begin_prepare(s, &begin_data); + apply_error_callback_arg.remote_xid = begin_data.xid; remote_final_lsn = begin_data.prepare_lsn; @@ -894,6 +923,8 @@ apply_handle_prepare(StringInfo s) char gid[GIDSIZE]; logicalrep_read_prepare(s, &prepare_data); + apply_error_callback_arg.remote_xid = prepare_data.xid; + apply_error_callback_arg.committs = prepare_data.prepare_time; if (prepare_data.prepare_lsn != remote_final_lsn) ereport(ERROR, @@ -950,6 +981,7 @@ apply_handle_prepare(StringInfo s) process_syncing_tables(prepare_data.end_lsn); pgstat_report_activity(STATE_IDLE, NULL); + reset_apply_error_context_info(); } /* @@ -962,6 +994,8 @@ apply_handle_commit_prepared(StringInfo s) char gid[GIDSIZE]; logicalrep_read_commit_prepared(s, &prepare_data); + apply_error_callback_arg.remote_xid = prepare_data.xid; + apply_error_callback_arg.committs = prepare_data.commit_time; /* Compute GID for two_phase transactions. */ TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid, @@ -989,6 +1023,7 @@ apply_handle_commit_prepared(StringInfo s) process_syncing_tables(prepare_data.end_lsn); pgstat_report_activity(STATE_IDLE, NULL); + reset_apply_error_context_info(); } /* @@ -1001,6 +1036,7 @@ apply_handle_rollback_prepared(StringInfo s) char gid[GIDSIZE]; logicalrep_read_rollback_prepared(s, &rollback_data); + apply_error_callback_arg.remote_xid = rollback_data.xid; /* Compute GID for two_phase transactions. */ TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid, @@ -1038,6 +1074,7 @@ apply_handle_rollback_prepared(StringInfo s) process_syncing_tables(rollback_data.rollback_end_lsn); pgstat_report_activity(STATE_IDLE, NULL); + reset_apply_error_context_info(); } /* @@ -1094,6 +1131,8 @@ apply_handle_stream_start(StringInfo s) (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg_internal("invalid transaction ID in streamed replication transaction"))); + apply_error_callback_arg.remote_xid = stream_xid; + /* * Initialize the xidhash table if we haven't yet. This will be used for * the entire duration of the apply worker so create it in permanent @@ -1150,6 +1189,7 @@ apply_handle_stream_stop(StringInfo s) MemoryContextReset(LogicalStreamingContext); pgstat_report_activity(STATE_IDLE, NULL); + reset_apply_error_context_info(); } /* @@ -1173,7 +1213,10 @@ apply_handle_stream_abort(StringInfo s) * just delete the files with serialized info. */ if (xid == subxid) + { + apply_error_callback_arg.remote_xid = xid; stream_cleanup_files(MyLogicalRepWorker->subid, xid); + } else { /* @@ -1198,6 +1241,7 @@ apply_handle_stream_abort(StringInfo s) char path[MAXPGPATH]; StreamXidHash *ent; + apply_error_callback_arg.remote_xid = subxid; subidx = -1; begin_replication_step(); subxact_info_read(MyLogicalRepWorker->subid, xid); @@ -1222,6 +1266,7 @@ apply_handle_stream_abort(StringInfo s) cleanup_subxact_info(); end_replication_step(); CommitTransactionCommand(); + reset_apply_error_context_info(); return; } @@ -1253,6 +1298,8 @@ apply_handle_stream_abort(StringInfo s) end_replication_step(); CommitTransactionCommand(); } + + reset_apply_error_context_info(); } /* @@ -1277,6 +1324,8 @@ apply_handle_stream_commit(StringInfo s) errmsg_internal("STREAM COMMIT message without STREAM STOP"))); xid = logicalrep_read_stream_commit(s, &commit_data); + apply_error_callback_arg.remote_xid = xid; + apply_error_callback_arg.committs = commit_data.committime; elog(DEBUG1, "received commit for streamed transaction %u", xid); @@ -1399,6 +1448,8 @@ apply_handle_stream_commit(StringInfo s) process_syncing_tables(commit_data.end_lsn); pgstat_report_activity(STATE_IDLE, NULL); + + reset_apply_error_context_info(); } /* @@ -1518,6 +1569,9 @@ apply_handle_insert(StringInfo s) return; } + /* Set relation for error callback */ + set_apply_error_context_rel(rel); + /* Initialize the executor state. */ edata = create_edata_for_relation(rel); estate = edata->estate; @@ -1541,6 +1595,9 @@ apply_handle_insert(StringInfo s) finish_edata(edata); + /* Reset relation for error callback */ + reset_apply_error_context_rel(); + logicalrep_rel_close(rel, NoLock); end_replication_step(); @@ -1639,6 +1696,9 @@ apply_handle_update(StringInfo s) return; } + /* Set relation for error callback */ + set_apply_error_context_rel(rel); + /* Check if we can do the update. */ check_relation_updatable(rel); @@ -1692,6 +1752,9 @@ apply_handle_update(StringInfo s) finish_edata(edata); + /* Reset relation for error callback */ + reset_apply_error_context_rel(); + logicalrep_rel_close(rel, NoLock); end_replication_step(); @@ -1795,6 +1858,9 @@ apply_handle_delete(StringInfo s) return; } + /* Set relation for error callback */ + set_apply_error_context_rel(rel); + /* Check if we can do the delete. */ check_relation_updatable(rel); @@ -1820,6 +1886,9 @@ apply_handle_delete(StringInfo s) finish_edata(edata); + /* Reset relation for error callback */ + reset_apply_error_context_rel(); + logicalrep_rel_close(rel, NoLock); end_replication_step(); @@ -2224,6 +2293,9 @@ apply_handle_truncate(StringInfo s) * Even if we used CASCADE on the upstream primary we explicitly default * to replaying changes without further cascading. This might be later * changeable with a user specified option. + * + * Both namespace and relation name for error callback will be set in + * ExecuteTruncateGuts(). */ ExecuteTruncateGuts(rels, relids, @@ -2254,44 +2326,54 @@ static void apply_dispatch(StringInfo s) { LogicalRepMsgType action = pq_getmsgbyte(s); + ErrorContextCallback errcallback; + + /* + * Push apply error context callback. Other fields will be filled + * during applying the change. + */ + apply_error_callback_arg.command = action; + errcallback.callback = apply_error_callback; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; switch (action) { case LOGICAL_REP_MSG_BEGIN: apply_handle_begin(s); - return; + break; case LOGICAL_REP_MSG_COMMIT: apply_handle_commit(s); - return; + break; case LOGICAL_REP_MSG_INSERT: apply_handle_insert(s); - return; + break; case LOGICAL_REP_MSG_UPDATE: apply_handle_update(s); - return; + break; case LOGICAL_REP_MSG_DELETE: apply_handle_delete(s); - return; + break; case LOGICAL_REP_MSG_TRUNCATE: apply_handle_truncate(s); - return; + break; case LOGICAL_REP_MSG_RELATION: apply_handle_relation(s); - return; + break; case LOGICAL_REP_MSG_TYPE: apply_handle_type(s); - return; + break; case LOGICAL_REP_MSG_ORIGIN: apply_handle_origin(s); - return; + break; case LOGICAL_REP_MSG_MESSAGE: @@ -2300,45 +2382,48 @@ apply_dispatch(StringInfo s) * Although, it could be used by other applications that use this * output plugin. */ - return; + break; case LOGICAL_REP_MSG_STREAM_START: apply_handle_stream_start(s); - return; + break; case LOGICAL_REP_MSG_STREAM_END: apply_handle_stream_stop(s); - return; + break; case LOGICAL_REP_MSG_STREAM_ABORT: apply_handle_stream_abort(s); - return; + break; case LOGICAL_REP_MSG_STREAM_COMMIT: apply_handle_stream_commit(s); - return; + break; case LOGICAL_REP_MSG_BEGIN_PREPARE: apply_handle_begin_prepare(s); - return; + break; case LOGICAL_REP_MSG_PREPARE: apply_handle_prepare(s); - return; + break; case LOGICAL_REP_MSG_COMMIT_PREPARED: apply_handle_commit_prepared(s); - return; + break; case LOGICAL_REP_MSG_ROLLBACK_PREPARED: apply_handle_rollback_prepared(s); - return; + break; + + default: + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid logical replication message type \"%c\"", action))); } - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg_internal("invalid logical replication message type \"%c\"", - action))); + /* Pop the error context stack */ + error_context_stack = errcallback.previous; } /* @@ -3571,3 +3656,94 @@ IsLogicalWorker(void) { return MyLogicalRepWorker != NULL; } + +static void +apply_error_callback(void *arg) +{ + StringInfoData buf; + + if (apply_error_callback_arg.command == 0) + return; + + initStringInfo(&buf); + appendStringInfo(&buf, _("during apply of \"%s\""), + logicalrep_message_type(apply_error_callback_arg.command)); + + if (apply_error_callback_arg.relname) + appendStringInfo(&buf, _(" for relation \"%s.%s\""), + apply_error_callback_arg.nspname, + apply_error_callback_arg.relname); + + if (TransactionIdIsNormal(apply_error_callback_arg.remote_xid)) + appendStringInfo(&buf, _(" in transaction with xid %u committs %s"), + apply_error_callback_arg.remote_xid, + apply_error_callback_arg.committs == 0 + ? "(unset)" + : timestamptz_to_str(apply_error_callback_arg.committs)); + + errcontext("%s", buf.data); +} + +/* Set relation information for apply error callback */ +static void +set_apply_error_context_rel(LogicalRepRelMapEntry *rel) +{ + apply_error_callback_arg.nspname = rel->remoterel.nspname; + apply_error_callback_arg.relname = rel->remoterel.relname; +} + +/* Reset relation information for apply error callback */ +static void +reset_apply_error_context_rel(void) +{ + apply_error_callback_arg.nspname = NULL; + apply_error_callback_arg.relname = NULL; +} + +/* Reset all information for apply error callback */ +static void +reset_apply_error_context_info(void) +{ + apply_error_callback_arg.command = 0; + apply_error_callback_arg.remote_xid = InvalidTransactionId; + apply_error_callback_arg.committs = 0; + reset_apply_error_context_rel(); +} + +/* + * Set relation information for error callback by the given relation. + * Both set_logicalrep_error_context_rel() and + * reset_logicalrep_error_context_rel() functions are intended to be + * used by functions outside of logical replication module where don't + * use LogicalRepRelMapEntry. + * + * The caller must call reset_logicalrep_error_context_rel() after use + * so we free the memory used for names. + */ +void +set_logicalrep_error_context_rel(Relation rel) +{ + if (IsLogicalWorker()) + { + apply_error_callback_arg.nspname = + get_namespace_name(RelationGetNamespace(rel)); + apply_error_callback_arg.relname = + pstrdup(RelationGetRelationName(rel)); + } +} + +/* Reset relation information for error callback set */ +void +reset_logicalrep_error_context_rel(void) +{ + if (IsLogicalWorker()) + { + if (apply_error_callback_arg.nspname) + pfree(apply_error_callback_arg.nspname); + apply_error_callback_arg.nspname = NULL; + + if (apply_error_callback_arg.relname) + pfree(apply_error_callback_arg.relname); + apply_error_callback_arg.relname = NULL; + } +} diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 63de90d94a..c78a4409bc 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -242,5 +242,6 @@ extern void logicalrep_write_stream_abort(StringInfo out, TransactionId xid, TransactionId subxid); extern void logicalrep_read_stream_abort(StringInfo in, TransactionId *xid, TransactionId *subxid); +extern char *logicalrep_message_type(LogicalRepMsgType action); #endif /* LOGICAL_PROTO_H */ diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h index 2ad61a001a..d3e8514ffd 100644 --- a/src/include/replication/logicalworker.h +++ b/src/include/replication/logicalworker.h @@ -15,5 +15,7 @@ extern void ApplyWorkerMain(Datum main_arg); extern bool IsLogicalWorker(void); +extern void set_logicalrep_error_context_rel(Relation rel); +extern void reset_logicalrep_error_context_rel(void); #endif /* LOGICALWORKER_H */ -- 2.24.3 (Apple Git-128)
From e513f9b2e1e8c5b05c1aafe08a2794445e280501 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada <sawada.mshk@gmail.com> Date: Fri, 16 Jul 2021 23:10:22 +0900 Subject: [PATCH v2 2/3] Add pg_stat_logical_replication_error statistics view. --- doc/src/sgml/monitoring.sgml | 126 +++++++ src/backend/catalog/system_views.sql | 15 + src/backend/postmaster/pgstat.c | 451 +++++++++++++++++++++++ src/backend/replication/logical/worker.c | 48 ++- src/backend/utils/adt/pgstatfuncs.c | 106 ++++++ src/backend/utils/error/elog.c | 16 + src/include/catalog/pg_proc.dat | 8 + src/include/pgstat.h | 73 ++++ src/include/utils/elog.h | 1 + src/test/regress/expected/rules.out | 12 + 10 files changed, 853 insertions(+), 3 deletions(-) diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 74a58a916c..06c4b0c8a5 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -346,6 +346,15 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser </entry> </row> + <row> + <entry><structname>pg_stat_subscription_errors</structname><indexterm><primary>pg_stat_subscription_errors</primary></indexterm></entry> + <entry>One row per error happened on subscription, showing information about + the subscription errors. + See <link linkend="monitoring-pg-stat-subscription-errors"> + <structname>pg_stat_subscription_errors</structname></link> for details. + </entry> + </row> + <row> <entry><structname>pg_stat_ssl</structname><indexterm><primary>pg_stat_ssl</primary></indexterm></entry> <entry>One row per connection (regular and replication), showing information about @@ -3050,6 +3059,123 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i </sect2> + <sect2 id="monitoring-pg-stat-subscription-errors"> + <title><structname>pg_stat_subscription_errors</structname></title> + + <indexterm> + <primary>pg_stat_subscription_errors</primary> + </indexterm> + + <para> + The <structname>pg_stat_subscription_errors</structname> view will contain one + row per subscription error, and additional rows for errors reported by workers + handling the initial data copy of the subscribed tables. + </para> + + <table id="pg-stat-subscription-errors" xreflabel="pg_stat_subscription-errors"> + <title><structname>pg_stat_subscription_errors</structname> View</title> + <tgroup cols="1"> + <thead> + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + Column Type + </para> + <para> + Description + </para></entry> + </row> + </thead> + + <tbody> + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>datname</structfield> <type>name</type> + </para> + <para> + Name of the database in which the subscription is created. + </para></entry> + </row> + + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>subname</structfield> <type>name</type> + </para> + <para> + Name of the subscription + </para></entry> + </row> + + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>relid</structfield> <type>oid</type> + </para> + <para> + OID of the relation that the worker is processing when the + error happened. + </para></entry> + </row> + + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>command</structfield> <type>text</type> + </para> + <para> + Name of command being applied when the error happened. + </para></entry> + </row> + + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>xid</structfield> <type>xid</type> + </para> + <para> + Transaction ID of publisher node being applied when the error + happened. + </para></entry> + </row> + + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>failure_source</structfield> <type>text</type> + </para> + <para> + Type of worker reported the error: <literal>apply</literal> or + <literal>tablesync</literal>. + </para></entry> + </row> + + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>failure_count</structfield> <type>uint8</type> + </para> + <para> + Number of times error happened on the worker. + </para></entry> + </row> + + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>last_failure</structfield> <type>timestamp with time zone</type> + </para> + <para> + Time at which the last error happened. + </para></entry> + </row> + + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>last_failure_message</structfield> <type>text</type> + </para> + <para> + Error message which is reported last failure time. + </para></entry> + </row> + </tbody> + </tgroup> + </table> + + </sect2> + <sect2 id="monitoring-pg-stat-ssl-view"> <title><structname>pg_stat_ssl</structname></title> diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 55f6e3711d..6031f063d2 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1257,3 +1257,18 @@ REVOKE ALL ON pg_subscription FROM public; GRANT SELECT (oid, subdbid, subname, subowner, subenabled, subbinary, substream, subtwophasestate, subslotname, subsynccommit, subpublications) ON pg_subscription TO public; + +CREATE VIEW pg_stat_subscription_errors AS + SELECT + d.datname, + s.subname, + e.relid, + e.command, + e.xid, + e.failure_source, + e.failure_count, + e.last_failure, + e.last_failure_message + FROM pg_subscription as s, + LATERAL pg_stat_get_subscription_error(s.oid) as e + JOIN pg_database as d ON (e.datid = d.oid); diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 11702f2a80..fc79b724e8 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -41,6 +41,7 @@ #include "catalog/partition.h" #include "catalog/pg_database.h" #include "catalog/pg_proc.h" +#include "catalog/pg_subscription.h" #include "common/ip.h" #include "executor/instrument.h" #include "libpq/libpq.h" @@ -106,6 +107,7 @@ #define PGSTAT_TAB_HASH_SIZE 512 #define PGSTAT_FUNCTION_HASH_SIZE 512 #define PGSTAT_REPLSLOT_HASH_SIZE 32 +#define PGSTAT_SUBSCRIPTION_ERR_HASH_SIZE 32 /* ---------- @@ -279,6 +281,7 @@ static PgStat_GlobalStats globalStats; static PgStat_WalStats walStats; static PgStat_SLRUStats slruStats[SLRU_NUM_ELEMENTS]; static HTAB *replSlotStatHash = NULL; +static HTAB *subscriptionErrHash = NULL; /* * List of OIDs of databases we need to write out. If an entry is InvalidOid, @@ -320,6 +323,11 @@ static bool pgstat_db_requested(Oid databaseid); static PgStat_StatReplSlotEntry *pgstat_get_replslot_entry(NameData name, bool create_it); static void pgstat_reset_replslot(PgStat_StatReplSlotEntry *slotstats, TimestampTz ts); +static PgStat_StatSubErrEntry * pgstat_get_subscription_error_entry(Oid subid, + bool create); +static PgStat_StatSubRelErrEntry * pgstat_get_subscription_rel_error_entry(PgStat_StatSubErrEntry *suberrent, + Oid subrelid); + static void pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg); static void pgstat_send_funcstats(void); static void pgstat_send_slru(void); @@ -358,6 +366,8 @@ static void pgstat_recv_checksum_failure(PgStat_MsgChecksumFailure *msg, int len static void pgstat_recv_connstat(PgStat_MsgConn *msg, int len); static void pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len); static void pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len); +static void pgstat_recv_subscription_error(PgStat_MsgSubscriptionErr *msg, int len); +static void pgstat_recv_subscription_purge(PgStat_MsgSubscriptionPurge *msg, int len); /* ------------------------------------------------------------ * Public functions called from postmaster follow @@ -1134,6 +1144,52 @@ pgstat_vacuum_stat(void) } } + /* + * Search for all the dead subscriptions in stats hashtable and tell the + * stats collector to drop them. + */ + if (subscriptionErrHash) + { + PgStat_MsgSubscriptionPurge s_msg; + PgStat_StatSubErrEntry *suberrent; + HTAB *htab; + + htab = pgstat_collect_oids(SubscriptionRelationId, Anum_pg_subscription_oid); + + s_msg.m_nentries = 0; + hash_seq_init(&hstat, subscriptionErrHash); + while ((suberrent = (PgStat_StatSubErrEntry *) hash_seq_search(&hstat)) != NULL) + { + CHECK_FOR_INTERRUPTS(); + + if (hash_search(htab, (void *) &(suberrent->subid), HASH_FIND, NULL) == NULL) + s_msg.m_subids[s_msg.m_nentries++] = suberrent->subid; + + /* If the message is full, send it out and reinitialize to empty */ + if (msg.m_nentries >= PGSTAT_NUM_SUBSCRIPTIONPURGE) + { + len = offsetof(PgStat_MsgSubscriptionPurge, m_subids[0]) + + s_msg.m_nentries * sizeof(Oid); + + pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBSCRIPTIONPURGE); + pgstat_send(&msg, len); + s_msg.m_nentries = 0; + } + } + + /* Send the rest */ + if (s_msg.m_nentries > 0) + { + len = offsetof(PgStat_MsgSubscriptionPurge, m_subids[0]) + + s_msg.m_nentries * sizeof(Oid); + + pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBSCRIPTIONPURGE); + pgstat_send(&msg, len); + } + + hash_destroy(htab); + } + /* * Lookup our own database entry; if not found, nothing more to do. */ @@ -1863,6 +1919,56 @@ pgstat_report_replslot_drop(const char *slotname) pgstat_send(&msg, sizeof(PgStat_MsgReplSlot)); } +/* ---------- + * pgstat_report_subscription_error() - + * + * Tell the collector about error of subscription. + * ---------- + */ +void +pgstat_report_subscription_error(Oid subid, Oid subrelid, Oid relid, + LogicalRepMsgType command, TransactionId xid, + const char *errmsg) +{ + PgStat_MsgSubscriptionErr msg; + int len; + + len = offsetof(PgStat_MsgSubscriptionErr, m_errmsg[0]) + strlen(errmsg); + Assert(len < PGSTAT_MAX_MSG_SIZE); + + pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBSCRIPTIONERR); + msg.m_subid = subid; + msg.m_subrelid = subrelid; + msg.m_databaseid = MyDatabaseId; + msg.m_relid = relid; + msg.m_clear = false; + msg.m_command = command; + msg.m_xid = xid; + msg.m_last_failure = GetCurrentTimestamp(); + strlcpy(msg.m_errmsg, errmsg, PGSTAT_SUBSCRIPTIONERR_MSGLEN); + + pgstat_send(&msg, len); +} + +/* ---------- + * pgstat_clear_subscription_error() - + * + * Tell the collector about clear the error of subscription. + * ---------- + */ +void +pgstat_clear_subscription_error(Oid subid, Oid subrelid) +{ + PgStat_MsgSubscriptionErr msg; + + pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBSCRIPTIONERR); + msg.m_subid = subid; + msg.m_subrelid = subrelid; + msg.m_clear = true; + + pgstat_send(&msg, offsetof(PgStat_MsgSubscriptionErr, m_clear) + sizeof(bool)); +} + /* ---------- * pgstat_ping() - * @@ -2895,6 +3001,23 @@ pgstat_fetch_replslot(NameData slotname) return pgstat_get_replslot_entry(slotname, false); } +/* + * --------- + * pgstat_fetch_subscription_error() - + * + * Support function for the SQL-callable pgstat* functions. Returns + * a pointer to the logical replication error struct. + * --------- + */ +PgStat_StatSubErrEntry * +pgstat_fetch_subscription_error(Oid subid) +{ + backend_read_statsfile(); + + return pgstat_get_subscription_error_entry(subid, false); +} + + /* * Shut down a single backend's statistics reporting at process exit. * @@ -3424,6 +3547,14 @@ PgstatCollectorMain(int argc, char *argv[]) pgstat_recv_connstat(&msg.msg_conn, len); break; + case PGSTAT_MTYPE_SUBSCRIPTIONERR: + pgstat_recv_subscription_error(&msg.msg_subscriptionerr, len); + break; + + case PGSTAT_MTYPE_SUBSCRIPTIONPURGE: + pgstat_recv_subscription_purge(&msg.msg_subscriptionpurge, len); + break; + default: break; } @@ -3725,6 +3856,41 @@ pgstat_write_statsfiles(bool permanent, bool allDbs) } } + /* + * Write subscription error structs + */ + if (subscriptionErrHash) + { + PgStat_StatSubErrEntry *suberrent; + + hash_seq_init(&hstat, subscriptionErrHash); + while ((suberrent = (PgStat_StatSubErrEntry *) hash_seq_search(&hstat)) != NULL) + { + PgStat_StatSubRelErrEntry *relerrent; + HASH_SEQ_STATUS relhstat; + long nrels = hash_get_num_entries(suberrent->suberrors); + + /* Skip this subscription if not have any errors */ + if (suberrent->suberrors == NULL) + continue; + + fputc('S', fpout); + rc = fwrite(suberrent, sizeof(PgStat_StatSubErrEntry), 1, fpout); + (void) rc; /* we'll check for error with ferror */ + + /* the number of errors follows */ + rc = fwrite(&nrels, sizeof(long), 1, fpout); + (void) rc; /* we'll check for error with ferror */ + + hash_seq_init(&relhstat, suberrent->suberrors); + while ((relerrent = (PgStat_StatSubRelErrEntry *) hash_seq_search(&relhstat)) != NULL) + { + rc = fwrite(relerrent, sizeof(PgStat_StatSubRelErrEntry), 1, fpout); + (void) rc; /* we'll check for error with ferror */ + } + } + } + /* * No more output to be done. Close the temp file and replace the old * pgstat.stat with it. The ferror() check replaces testing for error @@ -4184,6 +4350,96 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep) break; } + /* + * 'S' A PgStat_StatSubErrEntry struct followed by the number + * of errors and PgStat_StatSubRelErrEntry structs describing a + * subscription errors. + */ + case 'S': + { + PgStat_StatSubErrEntry suberrbuf; + PgStat_StatSubErrEntry *suberrent; + long nrels; + + /* Read the subscription entry */ + if (fread(&suberrbuf, 1, sizeof(PgStat_StatSubErrEntry), fpin) != + sizeof(PgStat_StatSubErrEntry)) + { + ereport(pgStatRunningInCollector ? LOG : WARNING, + (errmsg("corrupted statistics file \"%s\"", + statfile))); + goto done; + } + + /* Create hash table if we don't have it already. */ + if (subscriptionErrHash == NULL) + { + HASHCTL hash_ctl; + + hash_ctl.keysize = sizeof(Oid); + hash_ctl.entrysize = sizeof(PgStat_StatSubErrEntry); + hash_ctl.hcxt = pgStatLocalContext; + subscriptionErrHash = hash_create("Subscription error hash", + PGSTAT_SUBSCRIPTION_ERR_HASH_SIZE, + &hash_ctl, + HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + } + + /* Enter the subscription error entry */ + suberrent = + (PgStat_StatSubErrEntry *) hash_search(subscriptionErrHash, + (void *) &(suberrbuf.subid), + HASH_ENTER, NULL); + suberrent->suberrors = NULL; + + /* Read the number of errors in the subscription */ + if (fread(&nrels, 1, sizeof(long), fpin) != sizeof(long)) + { + ereport(pgStatRunningInCollector ? LOG : WARNING, + (errmsg("corrupted statistics file \"%s\"", + statfile))); + goto done; + } + + for (int i = 0; i < nrels; i++) + { + PgStat_StatSubRelErrEntry *subrelent; + PgStat_StatSubRelErrEntry subrelbuf; + + if (fread(&subrelbuf, 1, sizeof(PgStat_StatSubRelErrEntry), fpin) != + sizeof(PgStat_StatSubRelErrEntry)) + { + ereport(pgStatRunningInCollector ? LOG : WARNING, + (errmsg("corrupted statistics file \"%s\"", + statfile))); + goto done; + } + + if (suberrent->suberrors == NULL) + { + HASHCTL hash_ctl; + + hash_ctl.keysize = sizeof(Oid); + hash_ctl.entrysize = sizeof(PgStat_StatSubRelErrEntry); + hash_ctl.hcxt = pgStatLocalContext; + suberrent->suberrors = hash_create("Subscription relation error hash", + PGSTAT_SUBSCRIPTION_ERR_HASH_SIZE, + &hash_ctl, + HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + } + + /* Enter the error information to the subscription hash */ + subrelent = + (PgStat_StatSubRelErrEntry *) hash_search(suberrent->suberrors, + (void *) &(subrelbuf.subrelid), + HASH_ENTER, NULL); + + memcpy(subrelent, &subrelbuf, sizeof(PgStat_StatSubRelErrEntry)); + } + + break; + } + case 'E': goto done; @@ -4526,6 +4782,50 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent, } break; + /* + * 'S' A PgStat_StatSubErrEntry struct followed by the number + * of errors and PgStat_StatSubRelErrEntry structs describing a + * subscription errors. + */ + case 'S': + { + PgStat_StatSubErrEntry mySubErrs; + PgStat_StatSubRelErrEntry subrelbuf; + long nrels; + + if (fread(&mySubErrs, 1, sizeof(PgStat_StatSubErrEntry), fpin) + != sizeof(PgStat_StatSubErrEntry)) + { + ereport(pgStatRunningInCollector ? LOG : WARNING, + (errmsg("corrupted statistics file \"%s\"", + statfile))); + FreeFile(fpin); + return false; + } + + if (fread(&nrels, 1, sizeof(long), fpin) != sizeof(long)) + { + ereport(pgStatRunningInCollector ? LOG : WARNING, + (errmsg("corrupted statistics file \"%s\"", + statfile))); + goto done; + } + + for (int i = 0; i < nrels; i++) + { + if (fread(&subrelbuf, 1, sizeof(PgStat_StatSubRelErrEntry), fpin) != + sizeof(PgStat_StatSubRelErrEntry)) + { + ereport(pgStatRunningInCollector ? LOG : WARNING, + (errmsg("corrupted statistics file \"%s\"", + statfile))); + goto done; + } + } + } + + break; + case 'E': goto done; @@ -4716,6 +5016,7 @@ pgstat_clear_snapshot(void) pgStatLocalContext = NULL; pgStatDBHash = NULL; replSlotStatHash = NULL; + subscriptionErrHash = NULL; /* * Historically the backend_status.c facilities lived in this file, and @@ -5650,6 +5951,76 @@ pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len) } } +/* ---------- + * pgstat_recv_subscription_error() - + * + * Process a SUBSCRIPTIONERR message. + * ---------- + */ +static void +pgstat_recv_subscription_error(PgStat_MsgSubscriptionErr *msg, int len) +{ + PgStat_StatSubErrEntry *suberrent; + PgStat_StatSubRelErrEntry *relerrent; + + /* Get subscription errors */ + suberrent = pgstat_get_subscription_error_entry(msg->m_subid, true); + Assert(suberrent); + + /* Get the error entry of the relation */ + relerrent = pgstat_get_subscription_rel_error_entry(suberrent, + msg->m_subrelid); + Assert(relerrent); + + if (msg->m_clear) + { + /* reset all fields except for databaseid and failure_count */ + relerrent->relid = InvalidOid; + relerrent->command = 0; + relerrent->xid = InvalidTransactionId; + relerrent->last_failure = 0; + relerrent->errmsg[0] = '\0'; + } + else + { + relerrent->databaseid = msg->m_databaseid; + relerrent->relid = msg->m_relid; + relerrent->command = msg->m_command; + relerrent->xid = msg->m_xid; + relerrent->failure_count++; + relerrent->last_failure = msg->m_last_failure; + strlcpy(relerrent->errmsg, msg->m_errmsg, PGSTAT_SUBSCRIPTIONERR_MSGLEN); + } +} + +/* ---------- + * pgstat_recv_subscription_purge() - + * + * Process a SUBSCRIPTIONPURGE message. + * ---------- + */ +static void +pgstat_recv_subscription_purge(PgStat_MsgSubscriptionPurge *msg, int len) +{ + if (subscriptionErrHash != NULL) + return; + + for (int i = 0; i < msg->m_nentries; i++) + { + PgStat_StatSubErrEntry *suberrent; + + suberrent = hash_search(subscriptionErrHash, (void *) &(msg->m_subids[i]), + HASH_FIND, NULL); + + /* Cleanup the hash table for errors */ + if (suberrent->suberrors != NULL) + hash_destroy(suberrent->suberrors); + + (void) hash_search(subscriptionErrHash, (void *) &(msg->m_subids[i]), + HASH_REMOVE, NULL); + } +} + /* ---------- * pgstat_write_statsfile_needed() - * @@ -5747,6 +6118,86 @@ pgstat_get_replslot_entry(NameData name, bool create) return slotent; } +/* ---------- + * pgstat_get_subscription_error_entry + * + * Return the entry of subscription error entry with the subscription OID. + * Return NULL if not found and the caller didn't request to create it. + * + * create tells whether to create the new slot entry if it is not found. + * ---------- + */ +static PgStat_StatSubErrEntry * +pgstat_get_subscription_error_entry(Oid subid, bool create) +{ + PgStat_StatSubErrEntry *suberrent; + HASHACTION action = (create ? HASH_ENTER : HASH_FIND); + bool found; + + if (subscriptionErrHash == NULL) + { + HASHCTL hash_ctl; + + hash_ctl.keysize = sizeof(Oid); + hash_ctl.entrysize = sizeof(PgStat_StatSubErrEntry); + subscriptionErrHash = hash_create("Subscription error hash", + PGSTAT_SUBSCRIPTION_ERR_HASH_SIZE, + &hash_ctl, + HASH_ELEM | HASH_BLOBS); + } + + suberrent = (PgStat_StatSubErrEntry *) hash_search(subscriptionErrHash, + (void *) &subid, + action, &found); + + if (create && !found) + suberrent->suberrors = NULL; + + return suberrent; +} + +/* ---------- + * pgstat_get_subscription_rel_error_entry + * + * Lookup and create the subscription error entry with 'relid' from 'suberrent'. + * ---------- + */ +static PgStat_StatSubRelErrEntry * +pgstat_get_subscription_rel_error_entry(PgStat_StatSubErrEntry *suberrent, + Oid subrelid) +{ + PgStat_StatSubRelErrEntry *relerrent; + bool found; + + if (suberrent->suberrors == NULL) + { + HASHCTL hash_ctl; + + hash_ctl.keysize = sizeof(Oid); + hash_ctl.entrysize = sizeof(PgStat_StatSubRelErrEntry); + suberrent->suberrors = hash_create("Subscription relation error hash", + PGSTAT_SUBSCRIPTION_ERR_HASH_SIZE, + &hash_ctl, + HASH_ELEM | HASH_BLOBS); + } + + relerrent = (PgStat_StatSubRelErrEntry *) hash_search(suberrent->suberrors, + (void *) &subrelid, + HASH_ENTER, &found); + + /* initialize fields */ + if (!found) + { + relerrent->command = 0; + relerrent->xid = InvalidTransactionId; + relerrent->failure_count = 0; + relerrent->last_failure = 0; + relerrent->errmsg[0] = '\0'; + } + + return relerrent; +} + /* ---------- * pgstat_reset_replslot * diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index c23713468c..7c2ec983bb 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -227,6 +227,7 @@ typedef struct ApplyErrCallbackArg LogicalRepMsgType command; /* 0 if invalid */ /* Local relation information */ + Oid relid; /* used for error reporting */ char *nspname; /* used for error context */ char *relname; /* used for error context */ @@ -236,6 +237,7 @@ typedef struct ApplyErrCallbackArg static ApplyErrCallbackArg apply_error_callback_arg = { .command = 0, + .relid = InvalidOid, .relname = NULL, .nspname = NULL, .remote_xid = InvalidTransactionId, @@ -3524,8 +3526,26 @@ ApplyWorkerMain(Datum main_arg) { char *syncslotname; - /* This is table synchronization worker, call initial sync. */ - syncslotname = LogicalRepSyncTableStart(&origin_startpos); + PG_TRY(); + { + /* This is table synchronization worker, call initial sync. */ + syncslotname = LogicalRepSyncTableStart(&origin_startpos); + } + PG_CATCH(); + { + elog(NOTICE, "errmsg \"%s\"", + geterrmessage()); + + /* report the table sync error */ + pgstat_report_subscription_error(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + MyLogicalRepWorker->relid, + 0, + InvalidTransactionId, + geterrmessage()); + PG_RE_THROW(); + } + PG_END_TRY(); /* allocate slot name in long-lived context */ myslotname = MemoryContextStrdup(ApplyContext, syncslotname); @@ -3643,7 +3663,24 @@ ApplyWorkerMain(Datum main_arg) } /* Run the main loop. */ - LogicalRepApplyLoop(origin_startpos); + PG_TRY(); + { + LogicalRepApplyLoop(origin_startpos); + } + PG_CATCH(); + { + /* report the apply error */ + if (apply_error_callback_arg.command != 0) + pgstat_report_subscription_error(MySubscription->oid, + InvalidOid, + apply_error_callback_arg.relid, + apply_error_callback_arg.command, + apply_error_callback_arg.remote_xid, + geterrmessage()); + + PG_RE_THROW(); + } + PG_END_TRY(); proc_exit(0); } @@ -3688,6 +3725,7 @@ apply_error_callback(void *arg) static void set_apply_error_context_rel(LogicalRepRelMapEntry *rel) { + apply_error_callback_arg.relid = rel->localreloid; apply_error_callback_arg.nspname = rel->remoterel.nspname; apply_error_callback_arg.relname = rel->remoterel.relname; } @@ -3696,6 +3734,7 @@ set_apply_error_context_rel(LogicalRepRelMapEntry *rel) static void reset_apply_error_context_rel(void) { + apply_error_callback_arg.relid = InvalidOid; apply_error_callback_arg.nspname = NULL; apply_error_callback_arg.relname = NULL; } @@ -3725,6 +3764,7 @@ set_logicalrep_error_context_rel(Relation rel) { if (IsLogicalWorker()) { + apply_error_callback_arg.relid = RelationGetRelid(rel); apply_error_callback_arg.nspname = get_namespace_name(RelationGetNamespace(rel)); apply_error_callback_arg.relname = @@ -3738,6 +3778,8 @@ reset_logicalrep_error_context_rel(void) { if (IsLogicalWorker()) { + apply_error_callback_arg.relid = InvalidOid; + if (apply_error_callback_arg.nspname) pfree(apply_error_callback_arg.nspname); apply_error_callback_arg.nspname = NULL; diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index f0e09eae4d..b155c30fcf 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -24,6 +24,8 @@ #include "pgstat.h" #include "postmaster/bgworker_internals.h" #include "postmaster/postmaster.h" +#include "replication/logicalproto.h" +#include "replication/logicalworker.h" #include "replication/slot.h" #include "storage/proc.h" #include "storage/procarray.h" @@ -2380,3 +2382,107 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS) /* Returns the record as Datum */ PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); } + +/* + * Get the logical replication error for the given subscription. + */ +Datum +pg_stat_get_subscription_errors(PG_FUNCTION_ARGS) +{ +#define PG_STAT_GET_SUBSCRIPTION_ERROR_COLS 9 + Oid subid = PG_GETARG_OID(0); + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + TupleDesc tupdesc; + Tuplestorestate *tupstore; + MemoryContext per_query_ctx; + MemoryContext oldcontext; + PgStat_StatSubErrEntry *suberrent; + PgStat_StatSubRelErrEntry *relerrent; + HASH_SEQ_STATUS hstat; + + /* check to see if caller supports us returning a tuplestore */ + if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("set-valued function called in context that cannot accept a set"))); + if (!(rsinfo->allowedModes & SFRM_Materialize)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("materialize mode required, but it is not allowed in this context"))); + + /* Build a tuple descriptor for our result type */ + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + elog(ERROR, "return type must be a row type"); + + per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; + oldcontext = MemoryContextSwitchTo(per_query_ctx); + + tupstore = tuplestore_begin_heap(true, false, work_mem); + rsinfo->returnMode = SFRM_Materialize; + rsinfo->setResult = tupstore; + rsinfo->setDesc = tupdesc; + + MemoryContextSwitchTo(oldcontext); + + /* Get subscription errors */ + suberrent = pgstat_fetch_subscription_error(subid); + + /* Return NULL if the subscription doesn't have any errors */ + if (suberrent == NULL || suberrent->suberrors == NULL) + PG_RETURN_NULL(); + + hash_seq_init(&hstat, suberrent->suberrors); + while ((relerrent = (PgStat_StatSubRelErrEntry *) hash_seq_search(&hstat)) != NULL) + { + Datum values[PG_STAT_GET_SUBSCRIPTION_ERROR_COLS]; + bool nulls[PG_STAT_GET_SUBSCRIPTION_ERROR_COLS]; + + MemSet(nulls, 0, sizeof(nulls)); + + /* databaseid */ + values[0] = ObjectIdGetDatum(relerrent->databaseid); + + /* subid */ + values[1] = ObjectIdGetDatum(subid); + + /* relid */ + if (OidIsValid(relerrent->relid)) + values[2] = ObjectIdGetDatum(relerrent->relid); + else + nulls[2] = true; + + /* command */ + if (OidIsValid(relerrent->subrelid)) + nulls[3] = true; + else + values[3] = CStringGetTextDatum(logicalrep_message_type(relerrent->command)); + + /* xid */ + if (TransactionIdIsValid(relerrent->xid)) + values[4] = TransactionIdGetDatum(relerrent->xid); + else + nulls[4] = true; + + /* failure_source */ + if (OidIsValid(relerrent->subrelid)) + values[5] = CStringGetTextDatum("tablesync"); + else + values[5] = CStringGetTextDatum("apply"); + + /* failure_count */ + values[6] = Int64GetDatum(relerrent->failure_count); + + /* last_failure */ + values[7] = TimestampTzGetDatum(relerrent->last_failure); + + /* failure_message */ + values[8] = CStringGetTextDatum(relerrent->errmsg); + + tuplestore_putvalues(tupstore, tupdesc, values, nulls); + } + + /* clean up and return the tuplestore */ + tuplestore_donestoring(tupstore); + + return (Datum) 0; +} diff --git a/src/backend/utils/error/elog.c b/src/backend/utils/error/elog.c index a3e1c59a82..dd36850016 100644 --- a/src/backend/utils/error/elog.c +++ b/src/backend/utils/error/elog.c @@ -1441,6 +1441,22 @@ getinternalerrposition(void) return edata->internalpos; } +/* + * geterrmessage --- return the currently set error message + * + * This is only intended for use in error callback subroutines, since there + * is no other place outside elog.c where the concept is meaningful. + */ +const char * +geterrmessage(void) +{ + ErrorData *edata = &errordata[errordata_stack_depth]; + + /* we don't bother incrementing recursion_depth */ + CHECK_STACK_DEPTH(); + + return (const char *) edata->message; +} /* * Functions to allow construction of error message strings separately from diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 8cd0252082..92297d60d1 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5321,6 +5321,14 @@ proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}', proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,total_txns,total_bytes,stats_reset}', prosrc => 'pg_stat_get_replication_slot' }, +{ oid => '8523', descr => 'statistics: information about logical replication error', + proname => 'pg_stat_get_subscription_error', prorows => '10', proisstrict => 'f', + proretset => 't', provolatile => 's', proparallel => 'r', + prorettype => 'record', proargtypes => 'oid', + proallargtypes => '{oid,oid,oid,oid,text,xid,text,int8,timestamptz,text}', + proargmodes => '{i,o,o,o,o,o,o,o,o,o}', + proargnames => '{subid,datid,subid,relid,command,xid,failure_source,failure_count,last_failure,last_failure_message}', + prosrc => 'pg_stat_get_subscription_errors' }, { oid => '6118', descr => 'statistics: information about subscription', proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r', diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 9612c0a6c2..215ac3abd5 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -14,6 +14,7 @@ #include "datatype/timestamp.h" #include "portability/instr_time.h" #include "postmaster/pgarch.h" /* for MAX_XFN_CHARS */ +#include "replication/logicalproto.h" #include "utils/backend_progress.h" /* for backward compatibility */ #include "utils/backend_status.h" /* for backward compatibility */ #include "utils/hsearch.h" @@ -66,6 +67,8 @@ typedef enum StatMsgType PGSTAT_MTYPE_RESETSINGLECOUNTER, PGSTAT_MTYPE_RESETSLRUCOUNTER, PGSTAT_MTYPE_RESETREPLSLOTCOUNTER, + PGSTAT_MTYPE_SUBSCRIPTIONERR, + PGSTAT_MTYPE_SUBSCRIPTIONPURGE, PGSTAT_MTYPE_AUTOVAC_START, PGSTAT_MTYPE_VACUUM, PGSTAT_MTYPE_ANALYZE, @@ -539,6 +542,47 @@ typedef struct PgStat_MsgReplSlot PgStat_Counter m_total_bytes; } PgStat_MsgReplSlot; +/* ---------- + * PgStat_MsgSubscriptionErr Sent by the apply worker or the table sync worker to + update the error happening during logical replication. + * ---------- + */ +#define PGSTAT_SUBSCRIPTIONERR_MSGLEN 256 +typedef struct PgStat_MsgSubscriptionErr +{ + PgStat_MsgHdr m_hdr; + + /* + * m_subid and m_subrelid are used to determine the subscription and the + * reporter of this error. m_subrelid is InvalidOid if reported by the + * apply worker, otherwise by the table sync worker. In table sync worker + * case, m_subrelid must be the same as m_relid. + */ + Oid m_subid; + Oid m_subrelid; + + bool m_clear; + Oid m_databaseid; + Oid m_relid; + LogicalRepMsgType m_command; + TransactionId m_xid; + TimestampTz m_last_failure; + char m_errmsg[PGSTAT_SUBSCRIPTIONERR_MSGLEN]; +} PgStat_MsgSubscriptionErr; + +/* ---------- + * PgStat_MsgSubscriptionPurge Sent by the autovacuum purge the subscriptions. + * ---------- + */ +#define PGSTAT_NUM_SUBSCRIPTIONPURGE \ + ((PGSTAT_MSG_PAYLOAD - sizeof(int)) / sizeof(Oid)) + +typedef struct PgStat_MsgSubscriptionPurge +{ + PgStat_MsgHdr m_hdr; + int m_nentries; + Oid m_subids[PGSTAT_NUM_SUBSCRIPTIONPURGE]; +} PgStat_MsgSubscriptionPurge; /* ---------- * PgStat_MsgRecoveryConflict Sent by the backend upon recovery conflict @@ -710,6 +754,8 @@ typedef union PgStat_Msg PgStat_MsgChecksumFailure msg_checksumfailure; PgStat_MsgReplSlot msg_replslot; PgStat_MsgConn msg_conn; + PgStat_MsgSubscriptionErr msg_subscriptionerr; + PgStat_MsgSubscriptionPurge msg_subscriptionpurge; } PgStat_Msg; @@ -908,6 +954,28 @@ typedef struct PgStat_StatReplSlotEntry TimestampTz stat_reset_timestamp; } PgStat_StatReplSlotEntry; +/* + * Subscription error statistics kept in the stats collector + */ +typedef struct PgStat_StatSubErrEntry +{ + Oid subid; /* hash table key */ + HTAB *suberrors; +} PgStat_StatSubErrEntry; + +typedef struct PgStat_StatSubRelErrEntry +{ + Oid subrelid; /* InvalidOid if the apply worker, otherwise the table + * sync worker. hash table key. */ + Oid databaseid; + Oid relid; /* OID of relation related to the error. Must be the same + * as subrelid in the table sync case. */ + LogicalRepMsgType command; + TransactionId xid; + PgStat_Counter failure_count; + TimestampTz last_failure; + char errmsg[PGSTAT_SUBSCRIPTIONERR_MSGLEN]; +} PgStat_StatSubRelErrEntry; /* * Working state needed to accumulate per-function-call timing statistics. @@ -1011,6 +1079,10 @@ extern void pgstat_report_checksum_failure(void); extern void pgstat_report_replslot(const PgStat_StatReplSlotEntry *repSlotStat); extern void pgstat_report_replslot_create(const char *slotname); extern void pgstat_report_replslot_drop(const char *slotname); +extern void pgstat_report_subscription_error(Oid subid, Oid subrelid, Oid relid, + LogicalRepMsgType command, + TransactionId xid, const char *errmsg); +extern void pgstat_clear_subscription_error(Oid subid, Oid subrelid); extern void pgstat_initialize(void); @@ -1106,6 +1178,7 @@ extern PgStat_GlobalStats *pgstat_fetch_global(void); extern PgStat_WalStats *pgstat_fetch_stat_wal(void); extern PgStat_SLRUStats *pgstat_fetch_slru(void); extern PgStat_StatReplSlotEntry *pgstat_fetch_replslot(NameData slotname); +extern PgStat_StatSubErrEntry *pgstat_fetch_subscription_error(Oid subid); extern void pgstat_count_slru_page_zeroed(int slru_idx); extern void pgstat_count_slru_page_hit(int slru_idx); diff --git a/src/include/utils/elog.h b/src/include/utils/elog.h index f53607e12e..155145a77d 100644 --- a/src/include/utils/elog.h +++ b/src/include/utils/elog.h @@ -222,6 +222,7 @@ extern int err_generic_string(int field, const char *str); extern int geterrcode(void); extern int geterrposition(void); extern int getinternalerrposition(void); +extern const char *geterrmessage(void); /*---------- diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index e5ab11275d..9cde7c2d09 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -2094,6 +2094,18 @@ pg_stat_subscription| SELECT su.oid AS subid, st.latest_end_time FROM (pg_subscription su LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time) ON ((st.subid = su.oid))); +pg_stat_subscription_errors| SELECT d.datname, + s.subname, + e.relid, + e.command, + e.xid, + e.failure_source, + e.failure_count, + e.last_failure, + e.last_failure_message + FROM pg_subscription s, + (LATERAL pg_stat_get_subscription_error(s.oid) e(datid, subid, relid, command, xid, failure_source, failure_count, last_failure, last_failure_message) + JOIN pg_database d ON ((e.datid = d.oid))); pg_stat_sys_indexes| SELECT pg_stat_all_indexes.relid, pg_stat_all_indexes.indexrelid, pg_stat_all_indexes.schemaname, -- 2.24.3 (Apple Git-128)