On Thu, Sep 07, 2023 at 12:36:29PM +1200, Peter Smith wrote:
> Modified as suggested. PSA v3.

Thanks.  I've attached v4 with a couple of small changes.  Notably, I've
moved the worker_type column to before the pid column, as it felt more
natural to me to keep the PID columns together.  I've also added an
elog(ERROR, ...) for WORKERTYPE_UNKNOWN, as that seems to be the standard
practice elsewhere.  That being said, are we absolutely confident that this
really cannot happen?  I haven't looked too closely, but if there is a
small race or something that could cause us to see a worker with this type,
perhaps it would be better to actually list it as "unknown".  Thoughts?

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com
>From 85b826752b1b10176809918975ce7b495dadd3db Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Thu, 7 Sep 2023 15:24:23 -0700
Subject: [PATCH v4 1/1] add worker type to pg_stat_subscription

---
 doc/src/sgml/monitoring.sgml               | 11 +++++++++++
 src/backend/catalog/system_views.sql       |  1 +
 src/backend/replication/logical/launcher.c | 18 +++++++++++++++++-
 src/include/catalog/pg_proc.dat            |  6 +++---
 src/test/regress/expected/rules.out        |  3 ++-
 5 files changed, 34 insertions(+), 5 deletions(-)

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 4ff415d6a0..17f9323f23 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1993,6 +1993,17 @@ description | Waiting for a newly initialized WAL file to reach durable storage
       </para></entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>worker_type</structfield> <type>text</type>
+      </para>
+      <para>
+       Type of the subscription worker process.  Possible types are
+       <literal>apply</literal>, <literal>parallel</literal>, and
+       <literal>table synchronization</literal>.
+      </para></entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>pid</structfield> <type>integer</type>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 77b06e2a7a..fcb14976c0 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -949,6 +949,7 @@ CREATE VIEW pg_stat_subscription AS
     SELECT
             su.oid AS subid,
             su.subname,
+            st.worker_type,
             st.pid,
             st.leader_pid,
             st.relid,
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 7882fc91ce..501910b445 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -1278,7 +1278,7 @@ GetLeaderApplyWorkerPid(pid_t pid)
 Datum
 pg_stat_get_subscription(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_SUBSCRIPTION_COLS	9
+#define PG_STAT_GET_SUBSCRIPTION_COLS	10
 	Oid			subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
 	int			i;
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
@@ -1339,6 +1339,22 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
 		else
 			values[8] = TimestampTzGetDatum(worker.reply_time);
 
+		switch (worker.type)
+		{
+			case WORKERTYPE_APPLY:
+				values[9] = CStringGetTextDatum("apply");
+				break;
+			case WORKERTYPE_PARALLEL_APPLY:
+				values[9] = CStringGetTextDatum("parallel apply");
+				break;
+			case WORKERTYPE_TABLESYNC:
+				values[9] = CStringGetTextDatum("table synchronization");
+				break;
+			case WORKERTYPE_UNKNOWN:
+				/* Should never happen. */
+				elog(ERROR, "unknown worker type");
+		}
+
 		tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
 							 values, nulls);
 
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 9805bc6118..f0b7b9cbd8 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5484,9 +5484,9 @@
   proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
   prorettype => 'record', proargtypes => 'oid',
-  proallargtypes => '{oid,oid,oid,int4,int4,pg_lsn,timestamptz,timestamptz,pg_lsn,timestamptz}',
-  proargmodes => '{i,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{subid,subid,relid,pid,leader_pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time}',
+  proallargtypes => '{oid,oid,oid,int4,int4,pg_lsn,timestamptz,timestamptz,pg_lsn,timestamptz,text}',
+  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{subid,subid,relid,pid,leader_pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,worker_type}',
   prosrc => 'pg_stat_get_subscription' },
 { oid => '2026', descr => 'statistics: current backend PID',
   proname => 'pg_backend_pid', provolatile => 's', proparallel => 'r',
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 5058be5411..2c60400ade 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2118,6 +2118,7 @@ pg_stat_ssl| SELECT pid,
   WHERE (client_port IS NOT NULL);
 pg_stat_subscription| SELECT su.oid AS subid,
     su.subname,
+    st.worker_type,
     st.pid,
     st.leader_pid,
     st.relid,
@@ -2127,7 +2128,7 @@ pg_stat_subscription| SELECT su.oid AS subid,
     st.latest_end_lsn,
     st.latest_end_time
    FROM (pg_subscription su
-     LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, leader_pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time) ON ((st.subid = su.oid)));
+     LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, leader_pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time, worker_type) ON ((st.subid = su.oid)));
 pg_stat_subscription_stats| SELECT ss.subid,
     s.subname,
     ss.apply_error_count,
-- 
2.25.1

Reply via email to