On Thu, Oct 28, 2021 at 7:40 PM Amit Kapila <amit.kapil...@gmail.com> wrote:
>
> On Thu, Oct 28, 2021 at 10:36 AM Masahiko Sawada <sawada.m...@gmail.com> 
> wrote:
> >
> > On Wed, Oct 27, 2021 at 7:02 PM Amit Kapila <amit.kapil...@gmail.com> wrote:
> > >
> > > On Thu, Oct 21, 2021 at 10:29 AM Masahiko Sawada <sawada.m...@gmail.com> 
> > > wrote:
> > > >
> > > >
> > > > I've attached updated patches.
> >
> > Thank you for the comments!
> >
> > >
> > > Few comments:
> > > ==============
> > > 1. Is the patch cleaning tablesync error entries except via vacuum? If
> > > not, can't we send a message to remove tablesync errors once tablesync
> > > is successful (say when we reset skip_xid or when tablesync is
> > > finished) or when we drop subscription? I think the same applies to
> > > apply worker. I think we may want to track it in some way whether an
> > > error has occurred before sending the message but relying completely
> > > on a vacuum might be the recipe of bloat. I think in the case of a
> > > drop subscription we can simply send the message as that is not a
> > > frequent operation. I might be missing something here because in the
> > > tests after drop subscription you are expecting the entries from the
> > > view to get cleared
> >
> > Yes, I think we can have tablesync worker send a message to drop stats
> > once tablesync is successful. But if we do that also when dropping a
> > subscription, I think we need to do that only the transaction is
> > committed since we can drop a subscription that doesn't have a
> > replication slot and rollback the transaction. Probably we can send
> > the message only when the subscritpion does have a replication slot.
> >
>
> Right. And probably for apply worker after updating skip xid.

I'm not sure it's better to drop apply worker stats after resetting
skip xid (i.g., after skipping the transaction). Since the view is a
cumulative view and has last_error_time, I thought we can have the
apply worker stats until the subscription gets dropped. Since the
error reporting message could get lost, no entry in the view doesn’t
mean the worker doesn’t face an issue.

>
> > In other cases, we can remember the subscriptions being dropped and
> > send the message to drop the statistics of them after committing the
> > transaction but I’m not sure it’s worth having it.
> >
>
> Yeah, let's not go to that extent. I think in most cases subscriptions
> will have corresponding slots.

Agreed.

>
>  FWIW, we completely
> > rely on pg_stat_vacuum_stats() for cleaning up the dead tables and
> > functions. And we don't expect there are many subscriptions on the
> > database.
> >
>
> True, but we do send it for the database, so let's do it for the cases
> you explained in the first paragraph.

Agreed.

I've attached a new version patch. Since the syntax of skipping
transaction id is under the discussion I've attached only the error
reporting patch for now.


Regards,

--
Masahiko Sawada
EDB:  https://www.enterprisedb.com/
From ea18cad8624a093aa103272c9ababa303a49e66e Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Fri, 16 Jul 2021 23:10:22 +0900
Subject: [PATCH v19 1/3] Add a subscription worker statistics view
 "pg_stat_subscription_workers".

This commit adds a new system view pg_stat_subscription_workers,
that shows information about any errors which occur during application
of logical replication changes as well as during performing initial table
synchronization.

The subscription error entries are removed by autovacuum workers after
table synchronization completes in table sync worker cases and after
dropping the subscription in apply worker cases.

It also adds an SQL function pg_stat_reset_subscription_worker() to
reset a single subscription error.
---
 doc/src/sgml/monitoring.sgml                | 161 ++++++
 src/backend/catalog/system_functions.sql    |   2 +
 src/backend/catalog/system_views.sql        |  25 +
 src/backend/commands/subscriptioncmds.c     |  15 +-
 src/backend/postmaster/pgstat.c             | 604 ++++++++++++++++++++
 src/backend/replication/logical/tablesync.c |  13 +
 src/backend/replication/logical/worker.c    |  54 +-
 src/backend/utils/adt/pgstatfuncs.c         | 122 ++++
 src/include/catalog/pg_proc.dat             |  13 +
 src/include/pgstat.h                        | 126 ++++
 src/test/regress/expected/rules.out         |  20 +
 src/test/subscription/t/026_error_report.pl | 156 +++++
 src/tools/pgindent/typedefs.list            |   6 +
 13 files changed, 1313 insertions(+), 4 deletions(-)
 create mode 100644 src/test/subscription/t/026_error_report.pl

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 3173ec2566..094c7239fa 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -627,6 +627,15 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
       </entry>
      </row>
 
+     <row>
+      <entry><structname>pg_stat_subscription_workers</structname><indexterm><primary>pg_stat_subscription_workers</primary></indexterm></entry>
+      <entry>At least one row per subscription, showing about errors that
+      occurred on subscription.
+      See <link linkend="monitoring-pg-stat-subscription-workers">
+      <structname>pg_stat_subscription_workers</structname></link> for details.
+      </entry>
+     </row>
+
     </tbody>
    </tgroup>
   </table>
@@ -3034,6 +3043,136 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
 
  </sect2>
 
+ <sect2 id="monitoring-pg-stat-subscription-workers">
+  <title><structname>pg_stat_subscription_workers</structname></title>
+
+  <indexterm>
+   <primary>pg_stat_subscription_workers</primary>
+  </indexterm>
+
+  <para>
+   The <structname>pg_stat_subscription_workers</structname> view will contain
+   one row per subscription error reported by workers applying logical
+   replication changes and workers handling the initial data copy of the
+   subscribed tables.
+  </para>
+
+  <table id="pg-stat-subscription-workers" xreflabel="pg_stat_subscription_workers">
+   <title><structname>pg_stat_subscription_workers</structname> View</title>
+   <tgroup cols="1">
+    <thead>
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       Column Type
+      </para>
+      <para>
+       Description
+      </para></entry>
+     </row>
+    </thead>
+
+    <tbody>
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>subid</structfield> <type>oid</type>
+      </para>
+      <para>
+       OID of the subscription
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>subname</structfield> <type>name</type>
+      </para>
+      <para>
+       Name of the subscription
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>subrelid</structfield> <type>oid</type>
+      </para>
+      <para>
+       OID of the relation that the worker is synchronizing; NULL for the
+       main apply worker
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>relid</structfield> <type>oid</type>
+      </para>
+      <para>
+       OID of the relation that the worker was processing when the
+       error occurred
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>command</structfield> <type>text</type>
+      </para>
+      <para>
+       Name of command being applied when the error occurred.  This field
+       is always NULL if the error was reported during the initial data
+       copy.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>xid</structfield> <type>xid</type>
+      </para>
+      <para>
+       Transaction ID of the publisher node being applied when the error
+       occurred.  This field is always NULL if the error was reported
+       during the initial data copy.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>error_count</structfield> <type>uint8</type>
+      </para>
+      <para>
+       Number of consecutive times the error occurred
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>error_message</structfield> <type>text</type>
+      </para>
+      <para>
+       The error message
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>last_error_time</structfield> <type>timestamp with time zone</type>
+      </para>
+      <para>
+       Time at which the last error occurred
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>stats_reset</structfield> <type>timestamp with time zone</type>
+      </para>
+      <para>
+       Time at which these statistics were last reset
+      </para></entry>
+     </row>
+    </tbody>
+   </tgroup>
+  </table>
+
+ </sect2>
+
  <sect2 id="monitoring-pg-stat-ssl-view">
   <title><structname>pg_stat_ssl</structname></title>
 
@@ -5156,6 +5295,28 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
          can be granted EXECUTE to run the function.
        </para></entry>
       </row>
+
+      <row>
+       <entry role="func_table_entry"><para role="func_signature">
+        <indexterm>
+          <primary>pg_stat_reset_subscription_worker</primary>
+        </indexterm>
+        <function>pg_stat_reset_subscription_worker</function> ( <parameter>subid</parameter> <type>oid</type>, <parameter>relid</parameter> <type>oid</type> )
+        <returnvalue>void</returnvalue>
+       </para>
+       <para>
+        Resets statistics of a single subscription worker error.  If
+        the argument <parameter>relid</parameter> is not <literal>NULL</literal>,
+        resets error statistics of the <literal>tablesync</literal> worker for
+        the relation with <parameter>relid</parameter>.  Otherwise, resets the
+        error statistics of the <literal>apply</literal> worker running on the
+        subscription with <parameter>subid</parameter>.
+       </para>
+       <para>
+        This function is restricted to superusers by default, but other users
+        can be granted EXECUTE to run the function.
+       </para></entry>
+      </row>
      </tbody>
     </tgroup>
    </table>
diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql
index 54c93b16c4..921fce5546 100644
--- a/src/backend/catalog/system_functions.sql
+++ b/src/backend/catalog/system_functions.sql
@@ -639,6 +639,8 @@ REVOKE EXECUTE ON FUNCTION pg_stat_reset_single_function_counters(oid) FROM publ
 
 REVOKE EXECUTE ON FUNCTION pg_stat_reset_replication_slot(text) FROM public;
 
+REVOKE EXECUTE ON FUNCTION pg_stat_reset_subscription_worker(oid, oid) FROM public;
+
 REVOKE EXECUTE ON FUNCTION lo_import(text) FROM public;
 
 REVOKE EXECUTE ON FUNCTION lo_import(text, oid) FROM public;
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index eb560955cd..a2ee00c6fd 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1261,3 +1261,28 @@ REVOKE ALL ON pg_subscription FROM public;
 GRANT SELECT (oid, subdbid, subname, subowner, subenabled, subbinary,
               substream, subtwophasestate, subslotname, subsynccommit, subpublications)
     ON pg_subscription TO public;
+
+CREATE VIEW pg_stat_subscription_workers AS
+    SELECT
+	w.subid,
+	s.subname,
+	w.subrelid,
+	w.relid,
+	w.command,
+	w.xid,
+	w.error_count,
+	w.error_message,
+	w.last_error_time,
+	w.stats_reset
+    FROM (SELECT
+              oid as subid,
+              NULL as relid
+          FROM pg_subscription
+          UNION ALL
+          SELECT
+              srsubid as subid,
+              srrelid as relid
+          FROM pg_subscription_rel
+          WHERE srsubstate <> 'r') sr,
+          LATERAL pg_stat_get_subscription_worker(sr.subid, sr.relid) w
+          JOIN pg_subscription s ON (w.subid = s.oid);
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index c47ba26369..18962b91e1 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -32,6 +32,7 @@
 #include "executor/executor.h"
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
+#include "pgstat.h"
 #include "replication/logicallauncher.h"
 #include "replication/origin.h"
 #include "replication/slot.h"
@@ -1204,7 +1205,8 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	 * Since dropping a replication slot is not transactional, the replication
 	 * slot stays dropped even if the transaction rolls back.  So we cannot
 	 * run DROP SUBSCRIPTION inside a transaction block if dropping the
-	 * replication slot.
+	 * replication slot.  Also, in this case, we report a message for dropping
+	 * the subscription to the stats collector.
 	 *
 	 * XXX The command name should really be something like "DROP SUBSCRIPTION
 	 * of a subscription that is associated with a replication slot", but we
@@ -1377,6 +1379,17 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	}
 	PG_END_TRY();
 
+	/*
+	 * Send a message for dropping this subscription to the stats collector. We
+	 * can safely report dropping the subscription statistics here if the
+	 * subscription is associated with a replication slot since we cannot run
+	 * DROP SUBSCRIPTION inside a transaction block.  Subscription statistics will
+	 * be removed later by (auto)vacuum either if it's not associated with a
+	 * replication slot or if the message for dropping the subscription gets lost.
+	 */
+	if (slotname)
+		pgstat_report_subscription_drop(subid);
+
 	table_close(rel, NoLock);
 }
 
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index b7d0fbaefd..b7883ec477 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -41,6 +41,8 @@
 #include "catalog/catalog.h"
 #include "catalog/pg_database.h"
 #include "catalog/pg_proc.h"
+#include "catalog/pg_subscription.h"
+#include "catalog/pg_subscription_rel.h"
 #include "common/ip.h"
 #include "executor/instrument.h"
 #include "libpq/libpq.h"
@@ -106,6 +108,7 @@
 #define PGSTAT_TAB_HASH_SIZE	512
 #define PGSTAT_FUNCTION_HASH_SIZE	512
 #define PGSTAT_REPLSLOT_HASH_SIZE	32
+#define PGSTAT_SUBWORKER_HASH_SIZE	32
 
 
 /* ----------
@@ -282,6 +285,7 @@ static PgStat_GlobalStats globalStats;
 static PgStat_WalStats walStats;
 static PgStat_SLRUStats slruStats[SLRU_NUM_ELEMENTS];
 static HTAB *replSlotStatHash = NULL;
+static HTAB *subWorkerStatHash = NULL;
 
 /*
  * List of OIDs of databases we need to write out.  If an entry is InvalidOid,
@@ -332,6 +336,13 @@ static bool pgstat_db_requested(Oid databaseid);
 static PgStat_StatReplSlotEntry *pgstat_get_replslot_entry(NameData name, bool create_it);
 static void pgstat_reset_replslot(PgStat_StatReplSlotEntry *slotstats, TimestampTz ts);
 
+static PgStat_StatSubWorkerEntry *pgstat_get_subworker_entry(Oid subid, Oid subrelid,
+															 bool create);
+static void pgstat_reset_subworker_entry(PgStat_StatSubWorkerEntry *wentry, TimestampTz ts);
+static void pgstat_vacuum_subworker_stats(void);
+static void pgstat_send_subscription_purge(PgStat_MsgSubscriptionPurge *msg);
+static void pgstat_send_subworker_purge(PgStat_MsgSubWorkerPurge *msg);
+
 static void pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg, TimestampTz now);
 static void pgstat_send_funcstats(void);
 static void pgstat_send_slru(void);
@@ -356,6 +367,7 @@ static void pgstat_recv_resetsharedcounter(PgStat_MsgResetsharedcounter *msg, in
 static void pgstat_recv_resetsinglecounter(PgStat_MsgResetsinglecounter *msg, int len);
 static void pgstat_recv_resetslrucounter(PgStat_MsgResetslrucounter *msg, int len);
 static void pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg, int len);
+static void pgstat_recv_resetsubworkercounter(PgStat_MsgResetsubworkercounter *msg, int len);
 static void pgstat_recv_autovac(PgStat_MsgAutovacStart *msg, int len);
 static void pgstat_recv_vacuum(PgStat_MsgVacuum *msg, int len);
 static void pgstat_recv_analyze(PgStat_MsgAnalyze *msg, int len);
@@ -373,6 +385,9 @@ static void pgstat_recv_connect(PgStat_MsgConnect *msg, int len);
 static void pgstat_recv_disconnect(PgStat_MsgDisconnect *msg, int len);
 static void pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len);
 static void pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len);
+static void pgstat_recv_subscription_purge(PgStat_MsgSubscriptionPurge *msg, int len);
+static void pgstat_recv_subworker_error(PgStat_MsgSubWorkerError *msg, int len);
+static void pgstat_recv_subworker_purge(PgStat_MsgSubWorkerPurge *msg, int len);
 
 /* ------------------------------------------------------------
  * Public functions called from postmaster follow
@@ -1178,6 +1193,10 @@ pgstat_vacuum_stat(void)
 		}
 	}
 
+	/* Cleanup the dead subscription workers statistics */
+	if (subWorkerStatHash)
+		pgstat_vacuum_subworker_stats();
+
 	/*
 	 * Lookup our own database entry; if not found, nothing more to do.
 	 */
@@ -1355,6 +1374,210 @@ pgstat_collect_oids(Oid catalogid, AttrNumber anum_oid)
 }
 
 
+/* PgStat_StatSubWorkerEntry comparator sorting subid and subrelid */
+static int
+subworker_stats_comparator(const ListCell *a, const ListCell *b)
+{
+	PgStat_StatSubWorkerEntry *entry1 = (PgStat_StatSubWorkerEntry *) lfirst(a);
+	PgStat_StatSubWorkerEntry *entry2 = (PgStat_StatSubWorkerEntry *) lfirst(b);
+	int			ret;
+
+	ret = oid_cmp(&entry1->key.subid, &entry2->key.subid);
+	if (ret != 0)
+		return ret;
+
+	return oid_cmp(&entry1->key.subrelid, &entry2->key.subrelid);
+}
+
+/* ----------
+ * pgstat_vacuum_subworker_stat() -
+ *
+ * Subroutine for pgstat_vacuum_stat: tell the collector to remove dead
+ * subscriptions and worker statistics.
+ * ----------
+ */
+static void
+pgstat_vacuum_subworker_stats(void)
+{
+	PgStat_StatSubWorkerEntry *wentry;
+	PgStat_MsgSubscriptionPurge spmsg;
+	PgStat_MsgSubWorkerPurge wpmsg;
+	HASH_SEQ_STATUS hstat;
+	HTAB	   *subids;
+	List	   *subworker_stats = NIL;
+	List	   *not_ready_rels = NIL;
+	ListCell   *lc1;
+
+	/* Build the list of worker stats and sort it by subid and relid */
+	hash_seq_init(&hstat, subWorkerStatHash);
+	while ((wentry = (PgStat_StatSubWorkerEntry *) hash_seq_search(&hstat)) != NULL)
+		subworker_stats = lappend(subworker_stats, wentry);
+	list_sort(subworker_stats, subworker_stats_comparator);
+
+	/*
+	 * Read pg_subscription and make a list of OIDs of all existing
+	 * subscriptions
+	 */
+	subids = pgstat_collect_oids(SubscriptionRelationId, Anum_pg_subscription_oid);
+
+	/*
+	 * Search for all the dead subscriptions and unnecessary table sync worker
+	 * entries in stats hashtable and tell the stats collector to drop them.
+	 */
+	spmsg.m_nentries = 0;
+	wpmsg.m_nentries = 0;
+	wpmsg.m_subid = InvalidOid;
+	foreach(lc1, subworker_stats)
+	{
+		ListCell   *lc2;
+		bool		keep_it = false;
+
+		wentry = (PgStat_StatSubWorkerEntry *) lfirst(lc1);
+
+		CHECK_FOR_INTERRUPTS();
+
+		/* Skip if we already registered this subscription to purge */
+		if (spmsg.m_nentries > 0 &&
+			spmsg.m_subids[spmsg.m_nentries - 1] == wentry->key.subid)
+			continue;
+
+		/* Check if the subscription is dead */
+		if (hash_search(subids, (void *) &(wentry->key.subid), HASH_FIND, NULL) == NULL)
+		{
+			/* This subscription is dead, add the subid to the message */
+			spmsg.m_subids[spmsg.m_nentries++] = wentry->key.subid;
+
+			/*
+			 * If the message is full, send it out and reinitialize to empty
+			 */
+			if (spmsg.m_nentries >= PGSTAT_NUM_SUBSCRIPTIONPURGE)
+			{
+				pgstat_send_subscription_purge(&spmsg);
+				spmsg.m_nentries = 0;
+			}
+
+			continue;
+		}
+
+		/*
+		 * This subscription is alive.  The next step is that we search table
+		 * sync worker entries who are already in sync state. These should be
+		 * removed.
+		 */
+
+		/* We remove only table sync entries in the current database */
+		if (wentry->dbid != MyDatabaseId)
+			continue;
+
+		/* Skip if it's an apply worker entry */
+		if (!OidIsValid(wentry->key.subrelid))
+			continue;
+
+		if (wpmsg.m_subid != wentry->key.subid)
+		{
+			/*
+			 * Send the purge message for previously collected table sync
+			 * entries, if there is.
+			 */
+			if (wpmsg.m_nentries > 0)
+			{
+				pgstat_send_subworker_purge(&wpmsg);
+				wpmsg.m_nentries = 0;
+			}
+
+			/* Clean up the previously collected relations */
+			list_free_deep(not_ready_rels);
+
+			/* Refresh the not-ready-relations of this subscription */
+			not_ready_rels = GetSubscriptionNotReadyRelations(wentry->key.subid);
+
+			/* Prepare the worker purge message for the subscription */
+			wpmsg.m_subid = wentry->key.subid;
+		}
+
+		/*
+		 * Check if the table is still being synchronized or no longer belongs
+		 * to the subscription.
+		 */
+		foreach(lc2, not_ready_rels)
+		{
+			SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc2);
+
+			if (relstate->relid == wentry->key.subrelid)
+			{
+				/* This table is still being synchronized, so keep it */
+				keep_it = true;
+				break;
+			}
+		}
+
+		if (keep_it)
+			continue;
+
+		/* Add the table to the worker purge message */
+		wpmsg.m_relids[wpmsg.m_nentries++] = wentry->key.subrelid;
+
+		/*
+		 * If the worker purge message is full, send it out and reinitialize
+		 * to empty
+		 */
+		if (wpmsg.m_nentries >= PGSTAT_NUM_SUBWORKERPURGE)
+		{
+			pgstat_send_subworker_purge(&wpmsg);
+			wpmsg.m_nentries = 0;
+		}
+	}
+
+	/* Send the rest of dead subscriptions */
+	if (spmsg.m_nentries > 0)
+		pgstat_send_subscription_purge(&spmsg);
+
+	/* Send the rest of dead worker entries */
+	if (wpmsg.m_nentries > 0)
+		pgstat_send_subworker_purge(&wpmsg);
+
+	/* Clean up */
+	list_free_deep(not_ready_rels);
+	list_free(subworker_stats);
+	hash_destroy(subids);
+}
+
+/* --------
+ * pgstat_send_subscription_purge() -
+ *
+ *	Send a subscription purge message to the collector
+ * --------
+ */
+static void
+pgstat_send_subscription_purge(PgStat_MsgSubscriptionPurge *msg)
+{
+	int			len;
+
+	len = offsetof(PgStat_MsgSubscriptionPurge, m_subids[0])
+		+ msg->m_nentries * sizeof(Oid);
+
+	pgstat_setheader(&msg->m_hdr, PGSTAT_MTYPE_SUBSCRIPTIONPURGE);
+	pgstat_send(msg, len);
+}
+
+/* --------
+ * pgstat_send_subworker_purge() -
+ *
+ *	Send a subscription worker purge message to the collector
+ * --------
+ */
+static void
+pgstat_send_subworker_purge(PgStat_MsgSubWorkerPurge *msg)
+{
+	int			len;
+
+	len = offsetof(PgStat_MsgSubWorkerPurge, m_relids[0])
+		+ msg->m_nentries * sizeof(Oid);
+
+	pgstat_setheader(&msg->m_hdr, PGSTAT_MTYPE_SUBWORKERPURGE);
+	pgstat_send(msg, len);
+}
+
 /* ----------
  * pgstat_drop_database() -
  *
@@ -1544,6 +1767,24 @@ pgstat_reset_replslot_counter(const char *name)
 	pgstat_send(&msg, sizeof(msg));
 }
 
+/* ----------
+ * pgstat_reset_subworker_stats() -
+ *
+ *	Tell the collector to reset the subscription worker statistics.
+ * ----------
+ */
+void
+pgstat_reset_subworker_stats(Oid subid, Oid subrelid)
+{
+	PgStat_MsgResetsubworkercounter msg;
+
+	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETSUBWORKERCOUNTER);
+	msg.m_subid = subid;
+	msg.m_subrelid = subrelid;
+
+	pgstat_send(&msg, sizeof(PgStat_MsgResetsubworkercounter));
+}
+
 /* ----------
  * pgstat_report_autovac() -
  *
@@ -1869,6 +2110,70 @@ pgstat_report_replslot_drop(const char *slotname)
 	pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
 }
 
+/* ----------
+ * pgstat_report_subworker_error() -
+ *
+ *	Tell the collector about the subscription worker error.
+ * ----------
+ */
+void
+pgstat_report_subworker_error(Oid subid, Oid subrelid, Oid relid,
+							  LogicalRepMsgType command, TransactionId xid,
+							  const char *errmsg)
+{
+	PgStat_MsgSubWorkerError msg;
+	int			len;
+
+	Assert(strlen(errmsg) < PGSTAT_SUBWORKERERROR_MSGLEN);
+	len = offsetof(PgStat_MsgSubWorkerError, m_message[0]) + strlen(errmsg) + 1;
+
+	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBWORKERERROR);
+	msg.m_subid = subid;
+	msg.m_subrelid = subrelid;
+	msg.m_dbid = MyDatabaseId;
+	msg.m_relid = relid;
+	msg.m_command = command;
+	msg.m_xid = xid;
+	msg.m_timestamp = GetCurrentTimestamp();
+	strlcpy(msg.m_message, errmsg, PGSTAT_SUBWORKERERROR_MSGLEN);
+
+	pgstat_send(&msg, len);
+}
+
+/* ----------
+ * pgstat_report_subworker_drop() -
+ *
+ *	Tell the collector about dropping the subscription worker statistics.
+ *	This is used when a table sync worker exits.
+ * ----------
+ */
+void
+pgstat_report_subworker_drop(Oid subid, Oid subrelid)
+{
+	PgStat_MsgSubWorkerPurge msg;
+
+	msg.m_subid = subid;
+	msg.m_relids[0] = subrelid;
+	msg.m_nentries = 1;
+	pgstat_send_subworker_purge(&msg);
+}
+
+/* ----------
+ * pgstat_report_subscription_drop() -
+ *
+ *	Tell the collector about dropping the subscription.
+ * ----------
+ */
+void
+pgstat_report_subscription_drop(Oid subid)
+{
+	PgStat_MsgSubscriptionPurge msg;
+
+	msg.m_subids[0] = subid;
+	msg.m_nentries = 1;
+	pgstat_send_subscription_purge(&msg);
+}
+
 /* ----------
  * pgstat_ping() -
  *
@@ -2987,6 +3292,22 @@ pgstat_fetch_replslot(NameData slotname)
 	return pgstat_get_replslot_entry(slotname, false);
 }
 
+/*
+ * ---------
+ * pgstat_fetch_subworker() -
+ *
+ *	Support function for the SQL-callable pgstat* functions. Returns
+ *	a pointer to the subscription worker struct.
+ * ---------
+ */
+PgStat_StatSubWorkerEntry *
+pgstat_fetch_subworker(Oid subid, Oid subrelid)
+{
+	backend_read_statsfile();
+
+	return pgstat_get_subworker_entry(subid, subrelid, false);
+}
+
 /*
  * Shut down a single backend's statistics reporting at process exit.
  *
@@ -3498,6 +3819,11 @@ PgstatCollectorMain(int argc, char *argv[])
 													 len);
 					break;
 
+				case PGSTAT_MTYPE_RESETSUBWORKERCOUNTER:
+					pgstat_recv_resetsubworkercounter(&msg.msg_resetsubworkercounter,
+													  len);
+					break;
+
 				case PGSTAT_MTYPE_AUTOVAC_START:
 					pgstat_recv_autovac(&msg.msg_autovacuum_start, len);
 					break;
@@ -3568,6 +3894,18 @@ PgstatCollectorMain(int argc, char *argv[])
 					pgstat_recv_disconnect(&msg.msg_disconnect, len);
 					break;
 
+				case PGSTAT_MTYPE_SUBSCRIPTIONPURGE:
+					pgstat_recv_subscription_purge(&msg.msg_subscriptionpurge, len);
+					break;
+
+				case PGSTAT_MTYPE_SUBWORKERERROR:
+					pgstat_recv_subworker_error(&msg.msg_subworkererror, len);
+					break;
+
+				case PGSTAT_MTYPE_SUBWORKERPURGE:
+					pgstat_recv_subworker_purge(&msg.msg_subworkerpurge, len);
+					break;
+
 				default:
 					break;
 			}
@@ -3868,6 +4206,22 @@ pgstat_write_statsfiles(bool permanent, bool allDbs)
 		}
 	}
 
+	/*
+	 * Write subscription worker stats struct
+	 */
+	if (subWorkerStatHash)
+	{
+		PgStat_StatSubWorkerEntry *wentry;
+
+		hash_seq_init(&hstat, subWorkerStatHash);
+		while ((wentry = (PgStat_StatSubWorkerEntry *) hash_seq_search(&hstat)) != NULL)
+		{
+			fputc('S', fpout);
+			rc = fwrite(wentry, sizeof(PgStat_StatSubWorkerEntry), 1, fpout);
+			(void) rc;			/* we'll check for error with ferror */
+		}
+	}
+
 	/*
 	 * No more output to be done. Close the temp file and replace the old
 	 * pgstat.stat with it.  The ferror() check replaces testing for error
@@ -4329,6 +4683,48 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
 					break;
 				}
 
+				/*
+				 * 'S'	A PgStat_StatSubWorkerEntry struct describing
+				 * subscription worker statistics.
+				 */
+			case 'S':
+				{
+					PgStat_StatSubWorkerEntry wbuf;
+					PgStat_StatSubWorkerEntry *wentry;
+
+					/* Read the subscription entry */
+					if (fread(&wbuf, 1, sizeof(PgStat_StatSubWorkerEntry), fpin)
+						!= sizeof(PgStat_StatSubWorkerEntry))
+					{
+						ereport(pgStatRunningInCollector ? LOG : WARNING,
+								(errmsg("corrupted statistics file \"%s\"",
+										statfile)));
+						goto done;
+					}
+
+					/* Create hash table if we don't have it already. */
+					if (subWorkerStatHash == NULL)
+					{
+						HASHCTL		hash_ctl;
+
+						hash_ctl.keysize = sizeof(PgStat_StatSubWorkerKey);
+						hash_ctl.entrysize = sizeof(PgStat_StatSubWorkerEntry);
+						hash_ctl.hcxt = pgStatLocalContext;
+						subWorkerStatHash = hash_create("Subscription worker stat entries",
+														PGSTAT_SUBWORKER_HASH_SIZE,
+														&hash_ctl,
+														HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+					}
+
+					/* Enter the subscription entry and initialize fields */
+					wentry =
+						(PgStat_StatSubWorkerEntry *) hash_search(subWorkerStatHash,
+																  (void *) &wbuf.key,
+																  HASH_ENTER, NULL);
+					memcpy(wentry, &wbuf, sizeof(PgStat_StatSubWorkerEntry));
+					break;
+				}
+
 			case 'E':
 				goto done;
 
@@ -4541,6 +4937,7 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent,
 	PgStat_WalStats myWalStats;
 	PgStat_SLRUStats mySLRUStats[SLRU_NUM_ELEMENTS];
 	PgStat_StatReplSlotEntry myReplSlotStats;
+	PgStat_StatSubWorkerEntry mySubWorkerStats;
 	FILE	   *fpin;
 	int32		format_id;
 	const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename;
@@ -4671,6 +5068,22 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent,
 				}
 				break;
 
+				/*
+				 * 'S'	A PgStat_StatSubWorkerEntry struct describing
+				 * subscription worker statistics.
+				 */
+			case 'S':
+				if (fread(&mySubWorkerStats, 1, sizeof(mySubWorkerStats), fpin)
+					!= sizeof(mySubWorkerStats))
+				{
+					ereport(pgStatRunningInCollector ? LOG : WARNING,
+							(errmsg("corrupted statistics file \"%s\"",
+									statfile)));
+					FreeFile(fpin);
+					return false;
+				}
+				break;
+
 			case 'E':
 				goto done;
 
@@ -4876,6 +5289,7 @@ pgstat_clear_snapshot(void)
 	pgStatLocalContext = NULL;
 	pgStatDBHash = NULL;
 	replSlotStatHash = NULL;
+	subWorkerStatHash = NULL;
 
 	/*
 	 * Historically the backend_status.c facilities lived in this file, and
@@ -5344,6 +5758,31 @@ pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg,
 	}
 }
 
+/* ----------
+ * pgstat_recv_resetsubworkercounter() -
+ *
+ *	Process a RESETSUBWORKERCOUNTER message.
+ * ----------
+ */
+static void
+pgstat_recv_resetsubworkercounter(PgStat_MsgResetsubworkercounter *msg, int len)
+{
+	PgStat_StatSubWorkerEntry *wentry;
+
+	/* Get subscription worker stats */
+	wentry = pgstat_get_subworker_entry(msg->m_subid, msg->m_subrelid, false);
+
+	/*
+	 * Nothing to do if the subscription error entry is not found.  This could
+	 * happen when the subscription is dropped and the message for dropping
+	 * subscription entry arrived before the message for resetting the error.
+	 */
+	if (wentry == NULL)
+		return;
+
+	/* reset the entry and set reset timestamp */
+	pgstat_reset_subworker_entry(wentry, GetCurrentTimestamp());
+}
 
 /* ----------
  * pgstat_recv_autovac() -
@@ -5816,6 +6255,105 @@ pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len)
 	}
 }
 
+/* ----------
+ * pgstat_recv_subscription_purge() -
+ *
+ *	Process a SUBSCRIPTIONPURGE message.
+ * ----------
+ */
+static void
+pgstat_recv_subscription_purge(PgStat_MsgSubscriptionPurge *msg, int len)
+{
+	HASH_SEQ_STATUS sstat;
+	PgStat_StatSubWorkerEntry *wentry;
+
+	if (subWorkerStatHash == NULL)
+		return;
+
+	/* Remove all subscription worker statistics of the given subscriptions */
+	hash_seq_init(&sstat, subWorkerStatHash);
+	while ((wentry = (PgStat_StatSubWorkerEntry *) hash_seq_search(&sstat)) != NULL)
+	{
+		for (int i = 0; i < msg->m_nentries; i++)
+		{
+			if (wentry->key.subid == msg->m_subids[i])
+			{
+				(void) hash_search(subWorkerStatHash, (void *) &(wentry->key),
+								   HASH_REMOVE, NULL);
+				break;
+			}
+		}
+	}
+}
+
+/* ----------
+ * pgstat_recv_subworker_error() -
+ *
+ *	Process a SUBWORKERERROR message.
+ * ----------
+ */
+static void
+pgstat_recv_subworker_error(PgStat_MsgSubWorkerError *msg, int len)
+{
+	PgStat_StatSubWorkerEntry *wentry;
+
+	/* Get the subscription worker stats */
+	wentry = pgstat_get_subworker_entry(msg->m_subid, msg->m_subrelid, true);
+	Assert(wentry);
+
+	/*
+	 * Update only the counter and timestamp if we received the same error
+	 * again
+	 */
+	if (wentry->dbid == msg->m_dbid &&
+		wentry->relid == msg->m_relid &&
+		wentry->command == msg->m_command &&
+		wentry->xid == msg->m_xid &&
+		strcmp(wentry->error_message, msg->m_message) == 0)
+	{
+		wentry->error_count++;
+		wentry->error_time = msg->m_timestamp;
+		return;
+	}
+
+	/* Otherwise, update the error information */
+	wentry->dbid = msg->m_dbid;
+	wentry->relid = msg->m_relid;
+	wentry->command = msg->m_command;
+	wentry->xid = msg->m_xid;
+	wentry->error_count = 1;
+	wentry->error_time = msg->m_timestamp;
+	strlcpy(wentry->error_message, msg->m_message, PGSTAT_SUBWORKERERROR_MSGLEN);
+}
+
+/* ----------
+ * pgstat_recv_subworker_purge() -
+ *
+ *	Process a SUBWORKERPURGE message.
+ * ----------
+ */
+static void
+pgstat_recv_subworker_purge(PgStat_MsgSubWorkerPurge *msg, int len)
+{
+	PgStat_StatSubWorkerKey key;
+
+	if (subWorkerStatHash == NULL)
+		return;
+
+	key.subid = msg->m_subid;
+	for (int i = 0; i < msg->m_nentries; i++)
+	{
+		/*
+		 * Must be a table sync worker error as the apply worker error is
+		 * dropped only when the subscription is dropped.
+		 */
+		Assert(OidIsValid(msg->m_relids[i]));
+
+		key.subrelid = msg->m_relids[i];
+		(void) hash_search(subWorkerStatHash, (void *) &key, HASH_REMOVE, NULL);
+	}
+}
+
 /* ----------
  * pgstat_write_statsfile_needed() -
  *
@@ -5934,6 +6472,72 @@ pgstat_reset_replslot(PgStat_StatReplSlotEntry *slotent, TimestampTz ts)
 	slotent->stat_reset_timestamp = ts;
 }
 
+/* ----------
+ * pgstat_get_subworker_entry
+ *
+ * Return subscription worker entry with the given subscription OID and
+ * relation OID.  If subrelid is InvalidOid, it returns an entry of the
+ * apply worker otherwise of the table sync worker associated with subrelid.
+ * If no subscription entry exists, initialize it, if the create parameter
+ * is true.  Else, return NULL.
+ * ----------
+ */
+static PgStat_StatSubWorkerEntry *
+pgstat_get_subworker_entry(Oid subid, Oid subrelid, bool create)
+{
+	PgStat_StatSubWorkerEntry *wentry;
+	PgStat_StatSubWorkerKey key;
+	HASHACTION	action;
+	bool		found;
+
+	if (subWorkerStatHash == NULL)
+	{
+		HASHCTL		hash_ctl;
+
+		if (!create)
+			return NULL;
+
+		hash_ctl.keysize = sizeof(PgStat_StatSubWorkerKey);
+		hash_ctl.entrysize = sizeof(PgStat_StatSubWorkerEntry);
+		subWorkerStatHash = hash_create("Subscription worker stat entries",
+										PGSTAT_SUBWORKER_HASH_SIZE,
+										&hash_ctl,
+										HASH_ELEM | HASH_BLOBS);
+	}
+
+	key.subid = subid;
+	key.subrelid = subrelid;
+	action = (create ? HASH_ENTER : HASH_FIND);
+	wentry = (PgStat_StatSubWorkerEntry *) hash_search(subWorkerStatHash,
+													   (void *) &key,
+													   action, &found);
+
+	/* initialize fields */
+	if (create && !found)
+		pgstat_reset_subworker_entry(wentry, 0);
+
+	return wentry;
+}
+
+/* ----------
+ * pgstat_reset_subworker_entry
+ *
+ * Reset the given subscription worker statistics.
+ * ----------
+ */
+static void
+pgstat_reset_subworker_entry(PgStat_StatSubWorkerEntry *wentry, TimestampTz ts)
+{
+	wentry->dbid = InvalidOid;
+	wentry->relid = InvalidOid;
+	wentry->command = 0;
+	wentry->xid = InvalidTransactionId;
+	wentry->error_count = 0;
+	wentry->error_time = 0;
+	wentry->error_message[0] = '\0';
+	wentry->stat_reset_timestamp = ts;
+}
+
 /*
  * pgstat_slru_index
  *
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index f07983a43c..9b6d0579b4 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -331,6 +331,19 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 		 */
 		ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false);
 
+		/*
+		 * Send a message to drop the subscription worker statistics to the
+		 * stats collector.  Since there is no guarantee of the order of
+		 * message transfer on a UDP connection, it's possible that a message
+		 * for reporting statistics such as an error reaches after a message
+		 * for removing the statistics.  If the message reached in reverse or
+		 * the message got lost, we could not drop the statistics.  But
+		 * (auto)vacuum cleans up the statistics of the subscription worker who
+		 * is already in a ready state.
+		 */
+		pgstat_report_subworker_drop(MyLogicalRepWorker->subid,
+									 MyLogicalRepWorker->relid);
+
 		finish_sync_worker();
 	}
 	else
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 8d96c926b4..3a40684fa5 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3329,6 +3329,7 @@ void
 ApplyWorkerMain(Datum main_arg)
 {
 	int			worker_slot = DatumGetInt32(main_arg);
+	MemoryContext cctx = CurrentMemoryContext;
 	MemoryContext oldctx;
 	char		originname[NAMEDATALEN];
 	XLogRecPtr	origin_startpos;
@@ -3429,8 +3430,30 @@ ApplyWorkerMain(Datum main_arg)
 	{
 		char	   *syncslotname;
 
-		/* This is table synchronization worker, call initial sync. */
-		syncslotname = LogicalRepSyncTableStart(&origin_startpos);
+		PG_TRY();
+		{
+			/* This is table synchronization worker, call initial sync. */
+			syncslotname = LogicalRepSyncTableStart(&origin_startpos);
+		}
+		PG_CATCH();
+		{
+			MemoryContext ecxt = MemoryContextSwitchTo(cctx);
+			ErrorData  *errdata = CopyErrorData();
+
+			/*
+			 * Report the table sync error. There is no corresponding message
+			 * type for table synchronization.
+			 */
+			pgstat_report_subworker_error(MyLogicalRepWorker->subid,
+										  MyLogicalRepWorker->relid,
+										  MyLogicalRepWorker->relid,
+										  0,	/* message type */
+										  InvalidTransactionId,
+										  errdata->message);
+			MemoryContextSwitchTo(ecxt);
+			PG_RE_THROW();
+		}
+		PG_END_TRY();
 
 		/* allocate slot name in long-lived context */
 		myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
@@ -3548,7 +3571,32 @@ ApplyWorkerMain(Datum main_arg)
 	}
 
 	/* Run the main loop. */
-	LogicalRepApplyLoop(origin_startpos);
+	PG_TRY();
+	{
+		LogicalRepApplyLoop(origin_startpos);
+	}
+	PG_CATCH();
+	{
+		/* report the apply error */
+		if (apply_error_callback_arg.command != 0)
+		{
+			MemoryContext ecxt = MemoryContextSwitchTo(cctx);
+			ErrorData  *errdata = CopyErrorData();
+
+			pgstat_report_subworker_error(MyLogicalRepWorker->subid,
+										  MyLogicalRepWorker->relid,
+										  apply_error_callback_arg.rel != NULL
+										  ? apply_error_callback_arg.rel->localreloid
+										  : InvalidOid,
+										  apply_error_callback_arg.command,
+										  apply_error_callback_arg.remote_xid,
+										  errdata->message);
+			MemoryContextSwitchTo(ecxt);
+		}
+
+		PG_RE_THROW();
+	}
+	PG_END_TRY();
 
 	proc_exit(0);
 }
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index ff5aedc99c..2511df1c67 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -24,6 +24,7 @@
 #include "pgstat.h"
 #include "postmaster/bgworker_internals.h"
 #include "postmaster/postmaster.h"
+#include "replication/logicalproto.h"
 #include "replication/slot.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
@@ -2239,6 +2240,23 @@ pg_stat_reset_replication_slot(PG_FUNCTION_ARGS)
 	PG_RETURN_VOID();
 }
 
+/* Reset a subscription worker stats */
+Datum
+pg_stat_reset_subscription_worker(PG_FUNCTION_ARGS)
+{
+	Oid			subid = PG_GETARG_OID(0);
+	Oid			relid;
+
+	if (PG_ARGISNULL(1))
+		relid = InvalidOid;		/* reset apply worker error stats */
+	else
+		relid = PG_GETARG_OID(1);	/* reset table sync worker error stats */
+
+	pgstat_reset_subworker_stats(subid, relid);
+
+	PG_RETURN_VOID();
+}
+
 Datum
 pg_stat_get_archiver(PG_FUNCTION_ARGS)
 {
@@ -2379,3 +2397,107 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
 	/* Returns the record as Datum */
 	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
 }
+
+/*
+ * Get the subscription worker statistics for the given subscription
+ * (and relation).
+ */
+Datum
+pg_stat_get_subscription_worker(PG_FUNCTION_ARGS)
+{
+#define PG_STAT_GET_SUBSCRIPTION_WORKER_COLS 9
+	Oid			subid = PG_GETARG_OID(0);
+	Oid			subrelid;
+	TupleDesc	tupdesc;
+	Datum		values[PG_STAT_GET_SUBSCRIPTION_WORKER_COLS];
+	bool		nulls[PG_STAT_GET_SUBSCRIPTION_WORKER_COLS];
+	PgStat_StatSubWorkerEntry *wentry;
+	int			i;
+
+	if (PG_ARGISNULL(1))
+		subrelid = InvalidOid;
+	else
+		subrelid = PG_GETARG_OID(1);
+
+	/* Initialise attributes information in the tuple descriptor */
+	tupdesc = CreateTemplateTupleDesc(PG_STAT_GET_SUBSCRIPTION_WORKER_COLS);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 1, "subid",
+					   OIDOID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 2, "subrelid",
+					   OIDOID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 3, "relid",
+					   OIDOID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 4, "command",
+					   TEXTOID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 5, "xid",
+					   XIDOID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 6, "error_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 7, "error_message",
+					   TEXTOID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 8, "last_error_time",
+					   TIMESTAMPTZOID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 9, "stats_reset",
+					   TIMESTAMPTZOID, -1, 0);
+	BlessTupleDesc(tupdesc);
+
+	/* Get subscription worker stats */
+	wentry = pgstat_fetch_subworker(subid, subrelid);
+
+	/* Return NULL if there is no worker statistics */
+	if (wentry == NULL)
+		PG_RETURN_NULL();
+
+	/* Initialise values and NULL flags arrays */
+	MemSet(values, 0, sizeof(values));
+	MemSet(nulls, 0, sizeof(nulls));
+
+	i = 0;
+	/* subid */
+	values[i++] = ObjectIdGetDatum(subid);
+
+	/* subrelid */
+	if (OidIsValid(subrelid))
+		values[i++] = ObjectIdGetDatum(subrelid);
+	else
+		nulls[i++] = true;
+
+	/* relid */
+	if (OidIsValid(wentry->relid))
+		values[i++] = ObjectIdGetDatum(wentry->relid);
+	else
+		nulls[i++] = true;
+
+	/* command */
+	if (wentry->command != 0)
+		values[i++] = CStringGetTextDatum(logicalrep_message_type(wentry->command));
+	else
+		nulls[i++] = true;
+
+	/* xid */
+	if (TransactionIdIsValid(wentry->xid))
+		values[i++] = TransactionIdGetDatum(wentry->xid);
+	else
+		nulls[i++] = true;
+
+	/* error_count */
+	values[i++] = Int64GetDatum(wentry->error_count);
+
+	/* error_message */
+	values[i++] = CStringGetTextDatum(wentry->error_message);
+
+	/* last_error_time */
+	if (wentry->error_time != 0)
+		values[i++] = TimestampTzGetDatum(wentry->error_time);
+	else
+		nulls[i++] = true;
+
+	/* stats_reset */
+	if (wentry->stat_reset_timestamp != 0)
+		values[i++] = TimestampTzGetDatum(wentry->stat_reset_timestamp);
+	else
+		nulls[i++] = true;
+
+	/* Returns the record as Datum */
+	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
+}
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index d068d6532e..e6c7abbdcc 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5385,6 +5385,14 @@
   proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}',
   proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,total_txns,total_bytes,stats_reset}',
   prosrc => 'pg_stat_get_replication_slot' },
+{ oid => '8523', descr => 'statistics: information about subscription worker',
+  proname => 'pg_stat_get_subscription_worker', prorows => '1', proisstrict => 'f',
+  proretset => 't', provolatile => 's', proparallel => 'r',
+  prorettype => 'record', proargtypes => 'oid oid',
+  proallargtypes => '{oid,oid,oid,oid,oid,text,xid,int8,text,timestamptz,timestamptz}',
+  proargmodes => '{i,i,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{subid,subrelid,subid,subrelid,relid,command,xid,error_count,error_message,last_error_time,stats_reset}',
+  prosrc => 'pg_stat_get_subscription_worker' },
 { oid => '6118', descr => 'statistics: information about subscription',
   proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
@@ -5772,6 +5780,11 @@
   proname => 'pg_stat_reset_replication_slot', proisstrict => 'f',
   provolatile => 'v', prorettype => 'void', proargtypes => 'text',
   prosrc => 'pg_stat_reset_replication_slot' },
+{ oid => '8524',
+  descr => 'statistics: reset collected statistics for a single subscription worker',
+  proname => 'pg_stat_reset_subscription_worker', proisstrict => 'f',
+  provolatile => 'v', prorettype => 'void', proargtypes => 'oid oid',
+  prosrc => 'pg_stat_reset_subscription_worker' },
 
 { oid => '3163', descr => 'current trigger depth',
   proname => 'pg_trigger_depth', provolatile => 's', proparallel => 'r',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index bcd3588ea2..7a26d6db3f 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -14,6 +14,7 @@
 #include "datatype/timestamp.h"
 #include "portability/instr_time.h"
 #include "postmaster/pgarch.h"	/* for MAX_XFN_CHARS */
+#include "replication/logicalproto.h"
 #include "utils/backend_progress.h" /* for backward compatibility */
 #include "utils/backend_status.h"	/* for backward compatibility */
 #include "utils/hsearch.h"
@@ -66,6 +67,7 @@ typedef enum StatMsgType
 	PGSTAT_MTYPE_RESETSINGLECOUNTER,
 	PGSTAT_MTYPE_RESETSLRUCOUNTER,
 	PGSTAT_MTYPE_RESETREPLSLOTCOUNTER,
+	PGSTAT_MTYPE_RESETSUBWORKERCOUNTER,
 	PGSTAT_MTYPE_AUTOVAC_START,
 	PGSTAT_MTYPE_VACUUM,
 	PGSTAT_MTYPE_ANALYZE,
@@ -83,6 +85,9 @@ typedef enum StatMsgType
 	PGSTAT_MTYPE_REPLSLOT,
 	PGSTAT_MTYPE_CONNECT,
 	PGSTAT_MTYPE_DISCONNECT,
+	PGSTAT_MTYPE_SUBSCRIPTIONPURGE,
+	PGSTAT_MTYPE_SUBWORKERERROR,
+	PGSTAT_MTYPE_SUBWORKERPURGE,
 } StatMsgType;
 
 /* ----------
@@ -389,6 +394,24 @@ typedef struct PgStat_MsgResetreplslotcounter
 	bool		clearall;
 } PgStat_MsgResetreplslotcounter;
 
+/* ----------
+ * PgStat_MsgRestsubworkercounter	Sent by the backend to reset the subscription
+ *									worker statistics.
+ * ----------
+ */
+typedef struct PgStat_MsgResetsubworkercounter
+{
+	PgStat_MsgHdr m_hdr;
+
+	/*
+	 * Same as PgStat_MsgSubWorkerError, m_subid and m_subrelid are used to
+	 * determine the subscription and the reporter of the error: the apply
+	 * worker or the table sync worker.
+	 */
+	Oid			m_subid;
+	Oid			m_subrelid;
+} PgStat_MsgResetsubworkercounter;
+
 /* ----------
  * PgStat_MsgAutovacStart		Sent by the autovacuum daemon to signal
  *								that a database is going to be processed
@@ -536,6 +559,68 @@ typedef struct PgStat_MsgReplSlot
 	PgStat_Counter m_total_bytes;
 } PgStat_MsgReplSlot;
 
+/* ----------
+ * PgStat_MsgSubscriptionPurge	Sent by the backend and autovacuum to tell the
+ *								collector about the dead subscriptions.
+ * ----------
+ */
+#define PGSTAT_NUM_SUBSCRIPTIONPURGE  \
+	((PGSTAT_MSG_PAYLOAD - sizeof(int)) / sizeof(Oid))
+
+typedef struct PgStat_MsgSubscriptionPurge
+{
+	PgStat_MsgHdr m_hdr;
+	int			m_nentries;
+	Oid			m_subids[PGSTAT_NUM_SUBSCRIPTIONPURGE];
+} PgStat_MsgSubscriptionPurge;
+
+/* ----------
+ * PgStat_MsgSubWorkerPurge		Sent by the backend and autovacuum to purge
+ *								the subscription worker statistics.
+ * ----------
+ */
+#define PGSTAT_NUM_SUBWORKERPURGE  \
+	((PGSTAT_MSG_PAYLOAD - sizeof(Oid) - sizeof(int)) / sizeof(Oid))
+
+typedef struct PgStat_MsgSubWorkerPurge
+{
+	PgStat_MsgHdr m_hdr;
+	Oid			m_subid;
+	int			m_nentries;
+	Oid			m_relids[PGSTAT_NUM_SUBWORKERPURGE];
+} PgStat_MsgSubWorkerPurge;
+
+/* ----------
+ * PgStat_MsgSubWorkerError		Sent by the apply worker or the table sync worker to
+ *								report the error occurred during logical replication.
+ * ----------
+ */
+#define PGSTAT_SUBWORKERERROR_MSGLEN 256
+typedef struct PgStat_MsgSubWorkerError
+{
+	PgStat_MsgHdr m_hdr;
+
+	/*
+	 * m_subid and m_subrelid are used to determine the subscription and the
+	 * reporter of the error. m_subrelid is InvalidOid if reported by an apply
+	 * worker otherwise reported by a table sync worker.
+	 */
+	Oid			m_subid;
+	Oid			m_subrelid;
+
+	/*
+	 * Oids of the database and the table that the reporter was actually
+	 * processing. m_relid can be InvalidOid if an error occurred during
+	 * worker applying a non-data-modification message such as RELATION.
+	 */
+	Oid			m_dbid;
+	Oid			m_relid;
+
+	LogicalRepMsgType m_command;
+	TransactionId m_xid;
+	TimestampTz m_timestamp;
+	char		m_message[PGSTAT_SUBWORKERERROR_MSGLEN];
+} PgStat_MsgSubWorkerError;
 
 /* ----------
  * PgStat_MsgRecoveryConflict	Sent by the backend upon recovery conflict
@@ -697,6 +782,7 @@ typedef union PgStat_Msg
 	PgStat_MsgResetsinglecounter msg_resetsinglecounter;
 	PgStat_MsgResetslrucounter msg_resetslrucounter;
 	PgStat_MsgResetreplslotcounter msg_resetreplslotcounter;
+	PgStat_MsgResetsubworkercounter msg_resetsubworkercounter;
 	PgStat_MsgAutovacStart msg_autovacuum_start;
 	PgStat_MsgVacuum msg_vacuum;
 	PgStat_MsgAnalyze msg_analyze;
@@ -714,6 +800,9 @@ typedef union PgStat_Msg
 	PgStat_MsgReplSlot msg_replslot;
 	PgStat_MsgConnect msg_connect;
 	PgStat_MsgDisconnect msg_disconnect;
+	PgStat_MsgSubscriptionPurge msg_subscriptionpurge;
+	PgStat_MsgSubWorkerError msg_subworkererror;
+	PgStat_MsgSubWorkerPurge msg_subworkerpurge;
 } PgStat_Msg;
 
 
@@ -929,6 +1018,36 @@ typedef struct PgStat_StatReplSlotEntry
 	TimestampTz stat_reset_timestamp;
 } PgStat_StatReplSlotEntry;
 
+/* The lookup key for subscription worker hash table */
+typedef struct PgStat_StatSubWorkerKey
+{
+	Oid			subid;
+	Oid			subrelid;		/* InvalidOid for apply worker, otherwise for
+								 * table sync worker */
+} PgStat_StatSubWorkerKey;
+
+/*
+ * Logical replication apply worker and table sync worker statistics kept in the
+ * stats collector.
+ */
+typedef struct PgStat_StatSubWorkerEntry
+{
+	PgStat_StatSubWorkerKey key;	/* hash key (must be first) */
+
+	/*
+	 * Subscription worker error statistics representing an error that
+	 * occurred during application of logical replication or the initial table
+	 * synchronization.
+	 */
+	Oid			dbid;
+	Oid			relid;
+	LogicalRepMsgType command;
+	TransactionId xid;
+	PgStat_Counter error_count;
+	TimestampTz error_time;
+	char		error_message[PGSTAT_SUBWORKERERROR_MSGLEN];
+	TimestampTz stat_reset_timestamp;
+} PgStat_StatSubWorkerEntry;
 
 /*
  * Working state needed to accumulate per-function-call timing statistics.
@@ -1022,6 +1141,7 @@ extern void pgstat_reset_shared_counters(const char *);
 extern void pgstat_reset_single_counter(Oid objectid, PgStat_Single_Reset_Type type);
 extern void pgstat_reset_slru_counter(const char *);
 extern void pgstat_reset_replslot_counter(const char *name);
+extern void pgstat_reset_subworker_stats(Oid subid, Oid subrelid);
 
 extern void pgstat_report_connect(Oid dboid);
 extern void pgstat_report_autovac(Oid dboid);
@@ -1038,6 +1158,11 @@ extern void pgstat_report_checksum_failure(void);
 extern void pgstat_report_replslot(const PgStat_StatReplSlotEntry *repSlotStat);
 extern void pgstat_report_replslot_create(const char *slotname);
 extern void pgstat_report_replslot_drop(const char *slotname);
+extern void pgstat_report_subworker_error(Oid subid, Oid subrelid, Oid relid,
+										  LogicalRepMsgType command,
+										  TransactionId xid, const char *errmsg);
+extern void pgstat_report_subworker_drop(Oid subid, Oid subrelid);
+extern void pgstat_report_subscription_drop(Oid subid);
 
 extern void pgstat_initialize(void);
 
@@ -1136,6 +1261,7 @@ extern PgStat_GlobalStats *pgstat_fetch_global(void);
 extern PgStat_WalStats *pgstat_fetch_stat_wal(void);
 extern PgStat_SLRUStats *pgstat_fetch_slru(void);
 extern PgStat_StatReplSlotEntry *pgstat_fetch_replslot(NameData slotname);
+extern PgStat_StatSubWorkerEntry *pgstat_fetch_subworker(Oid subid, Oid subrelid);
 
 extern void pgstat_count_slru_page_zeroed(int slru_idx);
 extern void pgstat_count_slru_page_hit(int slru_idx);
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 2fa00a3c29..f6b1bd657f 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2094,6 +2094,26 @@ pg_stat_subscription| SELECT su.oid AS subid,
     st.latest_end_time
    FROM (pg_subscription su
      LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time) ON ((st.subid = su.oid)));
+pg_stat_subscription_workers| SELECT w.subid,
+    s.subname,
+    w.subrelid,
+    w.relid,
+    w.command,
+    w.xid,
+    w.error_count,
+    w.error_message,
+    w.last_error_time,
+    w.stats_reset
+   FROM ( SELECT pg_subscription.oid AS subid,
+            NULL::oid AS relid
+           FROM pg_subscription
+        UNION ALL
+         SELECT pg_subscription_rel.srsubid AS subid,
+            pg_subscription_rel.srrelid AS relid
+           FROM pg_subscription_rel
+          WHERE (pg_subscription_rel.srsubstate <> 'r'::"char")) sr,
+    (LATERAL pg_stat_get_subscription_worker(sr.subid, sr.relid) w(subid, subrelid, relid, command, xid, error_count, error_message, last_error_time, stats_reset)
+     JOIN pg_subscription s ON ((w.subid = s.oid)));
 pg_stat_sys_indexes| SELECT pg_stat_all_indexes.relid,
     pg_stat_all_indexes.indexrelid,
     pg_stat_all_indexes.schemaname,
diff --git a/src/test/subscription/t/026_error_report.pl b/src/test/subscription/t/026_error_report.pl
new file mode 100644
index 0000000000..3d23bb55d4
--- /dev/null
+++ b/src/test/subscription/t/026_error_report.pl
@@ -0,0 +1,156 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# Tests for subscription error reporting.
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More tests => 5;
+
+# Test if the error reported on pg_stat_subscription_workers view is expected.
+sub test_subscription_error
+{
+    my ($node, $relname, $xid, $expected_error, $msg) = @_;
+
+    my $check_sql = qq[
+SELECT count(1) > 0 FROM pg_stat_subscription_workers
+WHERE relid = '$relname'::regclass];
+    $check_sql .= " AND xid = '$xid'::xid;" if $xid ne '';
+
+    # Wait for the error statistics to be updated.
+    $node->poll_query_until(
+	'postgres', $check_sql,
+) or die "Timed out while waiting for statistics to be updated";
+
+    my $result = $node->safe_psql(
+	'postgres',
+	qq[
+SELECT subname, command, relid::regclass, error_count > 0
+FROM pg_stat_subscription_workers
+WHERE relid = '$relname'::regclass;
+]);
+    is($result, $expected_error, $msg);
+}
+
+# Create publisher node.
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf',
+			     qq[
+max_prepared_transactions = 10
+logical_decoding_work_mem = 64kB
+]);
+$node_publisher->start;
+
+# Create subscriber node.
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+
+# The subscriber will enter an infinite error loop, so we don't want
+# to overflow the server log with error messages.
+$node_subscriber->append_conf('postgresql.conf',
+			      qq[
+max_prepared_transactions = 10
+wal_retrieve_retry_interval = 5s
+]);
+$node_subscriber->start;
+
+# Initial table setup on both publisher and subscriber. On subscriber we create
+# the same tables but with primary keys. Also, insert some data that will conflict
+# with the data replicated from publisher later.
+$node_publisher->safe_psql(
+    'postgres',
+    q[
+BEGIN;
+CREATE TABLE test_tab1 (a int);
+CREATE TABLE test_tab2 (a int);
+CREATE TABLE test_tab_streaming (a int, b text);
+INSERT INTO test_tab1 VALUES (1);
+INSERT INTO test_tab2 VALUES (1);
+COMMIT;
+]);
+$node_subscriber->safe_psql(
+    'postgres',
+    q[
+BEGIN;
+CREATE TABLE test_tab1 (a int primary key);
+CREATE TABLE test_tab2 (a int primary key);
+CREATE TABLE test_tab_streaming (a int primary key, b text);
+INSERT INTO test_tab2 VALUES (1);
+INSERT INTO test_tab_streaming SELECT 10000, md5(10000::text);
+COMMIT;
+]);
+
+# Setup publications.
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+			   q[
+CREATE PUBLICATION tap_pub FOR TABLE test_tab1, test_tab2;
+CREATE PUBLICATION tap_pub_streaming FOR TABLE test_tab_streaming;
+]);
+
+# Check if there is no subscription errors before starting logical replication.
+my $result =
+    $node_subscriber->safe_psql('postgres',
+				"SELECT count(1) FROM pg_stat_subscription_workers");
+is($result, qq(0), 'check no subscription error');
+
+# Create subscriptions. The table sync for test_tab2 on tap_sub will enter into
+# infinite error loop due to violating the unique constraint.
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql(
+    'postgres',
+    "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = off, two_phase = on);");
+my $appname_streaming = 'tap_sub_streaming';
+$node_subscriber->safe_psql(
+    'postgres',
+    "CREATE SUBSCRIPTION tap_sub_streaming CONNECTION '$publisher_connstr application_name=$appname_streaming' PUBLICATION tap_pub_streaming WITH (streaming = on, two_phase = on);");
+
+$node_publisher->wait_for_catchup($appname);
+$node_publisher->wait_for_catchup($appname_streaming);
+
+# Wait for initial table sync for test_tab1 and test_tab_streaming to finish.
+$node_subscriber->poll_query_until('postgres',
+				   q[
+SELECT count(1) = 2 FROM pg_subscription_rel
+WHERE srrelid in ('test_tab1'::regclass, 'test_tab_streaming'::regclass) AND srsubstate in ('r', 's')
+]) or die "Timed out while waiting for subscriber to synchronize data";
+
+# Check the initial data.
+$result = $node_subscriber->safe_psql('postgres',
+				      "SELECT count(a) FROM test_tab1");
+is($result, q(1), 'check initial data are copied to subscriber');
+
+# Insert more data to test_tab1, raising an error on the subscriber due to violation
+# of the unique constraint on test_tab1.
+my $xid = $node_publisher->safe_psql(
+    'postgres',
+    qq[
+BEGIN;
+INSERT INTO test_tab1 VALUES (1);
+SELECT pg_current_xact_id()::xid;
+COMMIT;
+]);
+test_subscription_error($node_subscriber, 'test_tab1', $xid,
+			qq(tap_sub|INSERT|test_tab1|t),
+			'check the error reported by the apply worker');
+
+# Check the table sync worker's error in the view.
+test_subscription_error($node_subscriber, 'test_tab2', '',
+			qq(tap_sub||test_tab2|t),
+			'check the error reported by the table sync worker');
+
+# Check if the view doesn't show any entries after dropping the subscriptions.
+$node_subscriber->safe_psql(
+    'postgres',
+    q[
+DROP SUBSCRIPTION tap_sub;
+DROP SUBSCRIPTION tap_sub_streaming;
+]);
+$result = $node_subscriber->safe_psql('postgres',
+				      "SELECT count(1) FROM pg_stat_subscription_workers");
+is($result, q(0), 'no error after dropping subscription');
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 7bbbb34e2f..12cce497a3 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1942,7 +1942,11 @@ PgStat_MsgResetreplslotcounter
 PgStat_MsgResetsharedcounter
 PgStat_MsgResetsinglecounter
 PgStat_MsgResetslrucounter
+PgStat_MsgResetsubworkererror
 PgStat_MsgSLRU
+PgStat_MsgSubWorkerError
+PgStat_MsgSubWorkerErrorPurge
+PgStat_MsgSubWorkerPurge
 PgStat_MsgTabpurge
 PgStat_MsgTabstat
 PgStat_MsgTempFile
@@ -1954,6 +1958,8 @@ PgStat_Single_Reset_Type
 PgStat_StatDBEntry
 PgStat_StatFuncEntry
 PgStat_StatReplSlotEntry
+PgStat_StatSubWorkerEntry
+PgStat_StatSubWorkerKey
 PgStat_StatTabEntry
 PgStat_SubXactStatus
 PgStat_TableCounts
-- 
2.24.3 (Apple Git-128)

Reply via email to