On Thu, Sep 14, 2023 at 03:04:19PM -0700, Nathan Bossart wrote: > The only reason I didn't apply this already is because IMHO we should > adjust the worker types and the documentation for the view to be > consistent. For example, the docs say "leader apply worker" but the view > just calls them "apply" workers. The docs say "synchronization worker" but > the view calls them "table synchronization" workers. My first instinct is > to call apply workers "leader apply" workers in the view, and to call table > synchronization workers "table synchronization workers" in the docs.
Concretely, like this. -- Nathan Bossart Amazon Web Services: https://aws.amazon.com
>From 965dc5f05eee9e1c6cac1374d5800fff1ea5cba2 Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nat...@postgresql.org> Date: Thu, 14 Sep 2023 14:31:44 -0700 Subject: [PATCH v7 1/1] Add worker type to pg_stat_subscription. Thanks to 2a8b40e368, the logical replication worker type is easily determined, and this information is a nice addition to the pg_stat_subscription view. The worker type could already be deduced via other columns such as leader_pid and relid, but that is unnecessary complexity for users. Bumps catversion. Author: Peter Smith Reviewed-by: Michael Paquier, Maxim Orlov Discussion: https://postgr.es/m/CAHut%2BPtmbSMfErSk0S7xxVdZJ9XVE3xVLhqBTmT91kf57BeKDQ%40mail.gmail.com --- doc/src/sgml/monitoring.sgml | 13 ++++++++++++- src/backend/catalog/system_views.sql | 1 + src/backend/replication/logical/launcher.c | 18 +++++++++++++++++- src/include/catalog/catversion.h | 2 +- src/include/catalog/pg_proc.dat | 6 +++--- src/test/regress/expected/rules.out | 3 ++- src/test/subscription/t/004_sync.pl | 2 +- 7 files changed, 37 insertions(+), 8 deletions(-) diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 4ff415d6a0..d2328eb85d 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -1993,6 +1993,17 @@ description | Waiting for a newly initialized WAL file to reach durable storage </para></entry> </row> + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>worker_type</structfield> <type>text</type> + </para> + <para> + Type of the subscription worker process. Possible types are + <literal>leader apply</literal>, <literal>parallel apply</literal>, and + <literal>table synchronization</literal>. + </para></entry> + </row> + <row> <entry role="catalog_table_entry"><para role="column_definition"> <structfield>pid</structfield> <type>integer</type> @@ -2008,7 +2019,7 @@ description | Waiting for a newly initialized WAL file to reach durable storage </para> <para> Process ID of the leader apply worker if this process is a parallel - apply worker; NULL if this process is a leader apply worker or a + apply worker; NULL if this process is a leader apply worker or a table synchronization worker </para></entry> </row> diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 77b06e2a7a..fcb14976c0 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -949,6 +949,7 @@ CREATE VIEW pg_stat_subscription AS SELECT su.oid AS subid, su.subname, + st.worker_type, st.pid, st.leader_pid, st.relid, diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 7882fc91ce..54ab8a37f4 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -1278,7 +1278,7 @@ GetLeaderApplyWorkerPid(pid_t pid) Datum pg_stat_get_subscription(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_SUBSCRIPTION_COLS 9 +#define PG_STAT_GET_SUBSCRIPTION_COLS 10 Oid subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0); int i; ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; @@ -1339,6 +1339,22 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS) else values[8] = TimestampTzGetDatum(worker.reply_time); + switch (worker.type) + { + case WORKERTYPE_APPLY: + values[9] = CStringGetTextDatum("leader apply"); + break; + case WORKERTYPE_PARALLEL_APPLY: + values[9] = CStringGetTextDatum("parallel apply"); + break; + case WORKERTYPE_TABLESYNC: + values[9] = CStringGetTextDatum("table synchronization"); + break; + case WORKERTYPE_UNKNOWN: + /* Should never happen. */ + elog(ERROR, "unknown worker type"); + } + tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls); diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index 4eaef54d0c..f1f6c5855b 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -57,6 +57,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 202309061 +#define CATALOG_VERSION_NO 202309151 #endif diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 9805bc6118..f0b7b9cbd8 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5484,9 +5484,9 @@ proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r', prorettype => 'record', proargtypes => 'oid', - proallargtypes => '{oid,oid,oid,int4,int4,pg_lsn,timestamptz,timestamptz,pg_lsn,timestamptz}', - proargmodes => '{i,o,o,o,o,o,o,o,o,o}', - proargnames => '{subid,subid,relid,pid,leader_pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time}', + proallargtypes => '{oid,oid,oid,int4,int4,pg_lsn,timestamptz,timestamptz,pg_lsn,timestamptz,text}', + proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{subid,subid,relid,pid,leader_pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,worker_type}', prosrc => 'pg_stat_get_subscription' }, { oid => '2026', descr => 'statistics: current backend PID', proname => 'pg_backend_pid', provolatile => 's', proparallel => 'r', diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 5058be5411..2c60400ade 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -2118,6 +2118,7 @@ pg_stat_ssl| SELECT pid, WHERE (client_port IS NOT NULL); pg_stat_subscription| SELECT su.oid AS subid, su.subname, + st.worker_type, st.pid, st.leader_pid, st.relid, @@ -2127,7 +2128,7 @@ pg_stat_subscription| SELECT su.oid AS subid, st.latest_end_lsn, st.latest_end_time FROM (pg_subscription su - LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, leader_pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time) ON ((st.subid = su.oid))); + LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, leader_pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time, worker_type) ON ((st.subid = su.oid))); pg_stat_subscription_stats| SELECT ss.subid, s.subname, ss.apply_error_count, diff --git a/src/test/subscription/t/004_sync.pl b/src/test/subscription/t/004_sync.pl index bf4d59efba..f25c9f6408 100644 --- a/src/test/subscription/t/004_sync.pl +++ b/src/test/subscription/t/004_sync.pl @@ -80,7 +80,7 @@ $node_subscriber->safe_psql('postgres', # wait for it to start $node_subscriber->poll_query_until('postgres', - "SELECT pid IS NOT NULL FROM pg_stat_subscription WHERE subname = 'tap_sub2' AND relid IS NULL" + "SELECT pid IS NOT NULL FROM pg_stat_subscription WHERE subname = 'tap_sub2' AND worker_type = 'leader apply'" ) or die "Timed out while waiting for subscriber to start"; # and drop both subscriptions -- 2.25.1