On Sat, Sep 2, 2023 at 7:41 AM Nathan Bossart <nathandboss...@gmail.com> wrote:
Thanks for your interest in this patch. > Is there any reason not to spell out the names? I think that would match > the other system views better (e.g., backend_type in pg_stat_activity). I had thought it might be simpler in case someone wanted to query by type. But your suggestion for consistency is probably better, so I changed to do it that way. The help is also simplified to match the other 'backend_type' you cited. > Also, instead of "tablesync worker", I'd suggest using "synchronization > worker" to match the name used elsewhere in this table. > Changed to "table synchronization worker". > I see that the table refers to "leader apply workers". Would those show up > as parallel apply workers in the view? Can we add another worker type for > those? Internally there are only 3 worker types: A "leader" apply worker is basically the same as a regular apply worker, except it has other parallel apply workers associated with it. I felt that pretending there are 4 types in the view would be confusing. Instead, I just removed the word "leader". Now there are: "apply worker" "parallel apply worker" "table synchronization worker" PSA patch v2. ------ Kind Regards, Peter Smith. Fujitsu Australia
From bac581d9f6843b3df0dd5fc45e318594a7921ee6 Mon Sep 17 00:00:00 2001 From: Peter Smith <peter.b.smith@fujitsu.com> Date: Mon, 4 Sep 2023 13:58:16 +1000 Subject: [PATCH v2] Add worker_type to pg_stat_subscription --- doc/src/sgml/monitoring.sgml | 12 +++++++ src/backend/catalog/system_views.sql | 1 + src/backend/replication/logical/launcher.c | 55 +++++++++++++++++++++--------- src/include/catalog/pg_proc.dat | 6 ++-- src/test/regress/expected/rules.out | 3 +- 5 files changed, 56 insertions(+), 21 deletions(-) diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 4ff415d..45b9ccf 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -2004,6 +2004,18 @@ description | Waiting for a newly initialized WAL file to reach durable storage <row> <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>worker_type</structfield> <type>text</type> + </para> + <para> + Type of the subscription worker process. Possible types are: + <literal>apply worker</literal>, + <literal>parallel apply worker</literal>, + <literal>table synchronization worker</literal>. + </para></entry> + </row> + + <row> + <entry role="catalog_table_entry"><para role="column_definition"> <structfield>leader_pid</structfield> <type>integer</type> </para> <para> diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 77b06e2..3b7f5c2 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -950,6 +950,7 @@ CREATE VIEW pg_stat_subscription AS su.oid AS subid, su.subname, st.pid, + st.worker_type, st.leader_pid, st.relid, st.received_lsn, diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 7882fc9..0e2fbaf 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -1278,7 +1278,7 @@ GetLeaderApplyWorkerPid(pid_t pid) Datum pg_stat_get_subscription(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_SUBSCRIPTION_COLS 9 +#define PG_STAT_GET_SUBSCRIPTION_COLS 10 Oid subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0); int i; ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; @@ -1304,40 +1304,61 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS) if (OidIsValid(subid) && worker.subid != subid) continue; + values[0] = ObjectIdGetDatum(worker.subid); + worker_pid = worker.proc->pid; + values[1] = Int32GetDatum(worker_pid); + + switch (worker.type) + { + case WORKERTYPE_APPLY: + values[2] = CStringGetTextDatum("apply worker"); + break; + case WORKERTYPE_PARALLEL_APPLY: + values[2] = CStringGetTextDatum("parallel apply worker"); + break; + case WORKERTYPE_TABLESYNC: + values[2] = CStringGetTextDatum("table synchronization worker"); + break; + case WORKERTYPE_UNKNOWN: /* should not be possible */ + nulls[2] = true; + } - values[0] = ObjectIdGetDatum(worker.subid); - if (isTablesyncWorker(&worker)) - values[1] = ObjectIdGetDatum(worker.relid); - else - nulls[1] = true; - values[2] = Int32GetDatum(worker_pid); if (isParallelApplyWorker(&worker)) values[3] = Int32GetDatum(worker.leader_pid); else nulls[3] = true; - if (XLogRecPtrIsInvalid(worker.last_lsn)) + if (isTablesyncWorker(&worker)) + values[4] = ObjectIdGetDatum(worker.relid); + else nulls[4] = true; + + if (XLogRecPtrIsInvalid(worker.last_lsn)) + nulls[5] = true; else - values[4] = LSNGetDatum(worker.last_lsn); + values[5] = LSNGetDatum(worker.last_lsn); + if (worker.last_send_time == 0) - nulls[5] = true; + nulls[6] = true; else - values[5] = TimestampTzGetDatum(worker.last_send_time); + values[6] = TimestampTzGetDatum(worker.last_send_time); + if (worker.last_recv_time == 0) - nulls[6] = true; + nulls[7] = true; else - values[6] = TimestampTzGetDatum(worker.last_recv_time); + values[7] = TimestampTzGetDatum(worker.last_recv_time); + if (XLogRecPtrIsInvalid(worker.reply_lsn)) - nulls[7] = true; + nulls[8] = true; else - values[7] = LSNGetDatum(worker.reply_lsn); + values[8] = LSNGetDatum(worker.reply_lsn); + if (worker.reply_time == 0) - nulls[8] = true; + nulls[9] = true; else - values[8] = TimestampTzGetDatum(worker.reply_time); + values[9] = TimestampTzGetDatum(worker.reply_time); tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 9805bc6..e65edf9 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5484,9 +5484,9 @@ proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r', prorettype => 'record', proargtypes => 'oid', - proallargtypes => '{oid,oid,oid,int4,int4,pg_lsn,timestamptz,timestamptz,pg_lsn,timestamptz}', - proargmodes => '{i,o,o,o,o,o,o,o,o,o}', - proargnames => '{subid,subid,relid,pid,leader_pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time}', + proallargtypes => '{oid,oid,int4,text,int4,oid,pg_lsn,timestamptz,timestamptz,pg_lsn,timestamptz}', + proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{subid,subid,pid,worker_type,leader_pid,relid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time}', prosrc => 'pg_stat_get_subscription' }, { oid => '2026', descr => 'statistics: current backend PID', proname => 'pg_backend_pid', provolatile => 's', proparallel => 'r', diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 5058be5..f1f7f13 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -2119,6 +2119,7 @@ pg_stat_ssl| SELECT pid, pg_stat_subscription| SELECT su.oid AS subid, su.subname, st.pid, + st.worker_type, st.leader_pid, st.relid, st.received_lsn, @@ -2127,7 +2128,7 @@ pg_stat_subscription| SELECT su.oid AS subid, st.latest_end_lsn, st.latest_end_time FROM (pg_subscription su - LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, leader_pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time) ON ((st.subid = su.oid))); + LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, pid, worker_type, leader_pid, relid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time) ON ((st.subid = su.oid))); pg_stat_subscription_stats| SELECT ss.subid, s.subname, ss.apply_error_count, -- 1.8.3.1