On Sat, Sep 16, 2023 at 06:09:48PM +0530, Amit Kapila wrote: > I think there is a merit in keeping the worker type as 'sync' or > 'synchronization' because these would be used in future for syncing > other objects like sequences. One more thing that slightly looks odd > is the 'leader apply' type of worker, won't this be confusing when > there is no parallel apply worker in the system? In this regard, > probably existing documentation could also be improved.
These are good points. I went ahead and adjusted the patch back to using "apply" for [leader] apply workers and to using "synchronization" for synchronization workers. I also adjusted a couple of the error messages that Michael pointed out to say "synchronization worker" instead of "table synchronization worker" or "tablesync worker". This still leaves the possibility for confusion with the documentation's use of "leader apply worker", but I haven't touched that for now. -- Nathan Bossart Amazon Web Services: https://aws.amazon.com
>From 42c7b1910af29a8543ed65b2150e5eedae34a594 Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nat...@postgresql.org> Date: Sat, 16 Sep 2023 13:21:54 -0700 Subject: [PATCH v8 1/1] Add worker type to pg_stat_subscription. Thanks to 2a8b40e368, the logical replication worker type is easily determined, and this information is a nice addition to the pg_stat_subscription view. The worker type could already be deduced via other columns such as leader_pid and relid, but that is unnecessary complexity for users. Bumps catversion. Author: Peter Smith Reviewed-by: Michael Paquier, Maxim Orlov, Amit Kapila Discussion: https://postgr.es/m/CAHut%2BPtmbSMfErSk0S7xxVdZJ9XVE3xVLhqBTmT91kf57BeKDQ%40mail.gmail.com --- doc/src/sgml/monitoring.sgml | 11 +++++++++++ src/backend/catalog/system_views.sql | 1 + src/backend/replication/logical/launcher.c | 18 +++++++++++++++++- src/backend/replication/logical/tablesync.c | 2 +- src/backend/replication/logical/worker.c | 6 +++--- src/include/catalog/catversion.h | 2 +- src/include/catalog/pg_proc.dat | 6 +++--- src/test/regress/expected/rules.out | 3 ++- src/test/subscription/t/004_sync.pl | 2 +- 9 files changed, 40 insertions(+), 11 deletions(-) diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 4ff415d6a0..3546e8b3d9 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -1993,6 +1993,17 @@ description | Waiting for a newly initialized WAL file to reach durable storage </para></entry> </row> + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>worker_type</structfield> <type>text</type> + </para> + <para> + Type of the subscription worker process. Possible types are + <literal>apply</literal>, <literal>parallel apply</literal>, and + <literal>synchronization</literal>. + </para></entry> + </row> + <row> <entry role="catalog_table_entry"><para role="column_definition"> <structfield>pid</structfield> <type>integer</type> diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 77b06e2a7a..fcb14976c0 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -949,6 +949,7 @@ CREATE VIEW pg_stat_subscription AS SELECT su.oid AS subid, su.subname, + st.worker_type, st.pid, st.leader_pid, st.relid, diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 7882fc91ce..49e1c934a7 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -1278,7 +1278,7 @@ GetLeaderApplyWorkerPid(pid_t pid) Datum pg_stat_get_subscription(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_SUBSCRIPTION_COLS 9 +#define PG_STAT_GET_SUBSCRIPTION_COLS 10 Oid subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0); int i; ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; @@ -1339,6 +1339,22 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS) else values[8] = TimestampTzGetDatum(worker.reply_time); + switch (worker.type) + { + case WORKERTYPE_APPLY: + values[9] = CStringGetTextDatum("apply"); + break; + case WORKERTYPE_PARALLEL_APPLY: + values[9] = CStringGetTextDatum("parallel apply"); + break; + case WORKERTYPE_TABLESYNC: + values[9] = CStringGetTextDatum("synchronization"); + break; + case WORKERTYPE_UNKNOWN: + /* Should never happen. */ + elog(ERROR, "unknown worker type"); + } + tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls); diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index e2cee92cf2..cd71807088 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -151,7 +151,7 @@ finish_sync_worker(void) StartTransactionCommand(); ereport(LOG, - (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished", + (errmsg("logical replication synchronization worker for subscription \"%s\", table \"%s\" has finished", MySubscription->name, get_rel_name(MyLogicalRepWorker->relid)))); CommitTransactionCommand(); diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 597947410f..3894d741ac 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -1065,7 +1065,7 @@ apply_handle_begin_prepare(StringInfo s) if (am_tablesync_worker()) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg_internal("tablesync worker received a BEGIN PREPARE message"))); + errmsg_internal("synchronization worker received a BEGIN PREPARE message"))); /* There must not be an active streaming transaction. */ Assert(!TransactionIdIsValid(stream_xid)); @@ -1304,7 +1304,7 @@ apply_handle_stream_prepare(StringInfo s) if (am_tablesync_worker()) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg_internal("tablesync worker received a STREAM PREPARE message"))); + errmsg_internal("synchronization worker received a STREAM PREPARE message"))); logicalrep_read_stream_prepare(s, &prepare_data); set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn); @@ -4628,7 +4628,7 @@ InitializeLogRepWorker(void) if (am_tablesync_worker()) ereport(LOG, - (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started", + (errmsg("logical replication synchronization worker for subscription \"%s\", table \"%s\" has started", MySubscription->name, get_rel_name(MyLogicalRepWorker->relid)))); else diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index 4eaef54d0c..791d8ef313 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -57,6 +57,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 202309061 +#define CATALOG_VERSION_NO 202309161 #endif diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 9805bc6118..f0b7b9cbd8 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5484,9 +5484,9 @@ proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r', prorettype => 'record', proargtypes => 'oid', - proallargtypes => '{oid,oid,oid,int4,int4,pg_lsn,timestamptz,timestamptz,pg_lsn,timestamptz}', - proargmodes => '{i,o,o,o,o,o,o,o,o,o}', - proargnames => '{subid,subid,relid,pid,leader_pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time}', + proallargtypes => '{oid,oid,oid,int4,int4,pg_lsn,timestamptz,timestamptz,pg_lsn,timestamptz,text}', + proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{subid,subid,relid,pid,leader_pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,worker_type}', prosrc => 'pg_stat_get_subscription' }, { oid => '2026', descr => 'statistics: current backend PID', proname => 'pg_backend_pid', provolatile => 's', proparallel => 'r', diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 5058be5411..2c60400ade 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -2118,6 +2118,7 @@ pg_stat_ssl| SELECT pid, WHERE (client_port IS NOT NULL); pg_stat_subscription| SELECT su.oid AS subid, su.subname, + st.worker_type, st.pid, st.leader_pid, st.relid, @@ -2127,7 +2128,7 @@ pg_stat_subscription| SELECT su.oid AS subid, st.latest_end_lsn, st.latest_end_time FROM (pg_subscription su - LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, leader_pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time) ON ((st.subid = su.oid))); + LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, leader_pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time, worker_type) ON ((st.subid = su.oid))); pg_stat_subscription_stats| SELECT ss.subid, s.subname, ss.apply_error_count, diff --git a/src/test/subscription/t/004_sync.pl b/src/test/subscription/t/004_sync.pl index bf4d59efba..ee07d28b37 100644 --- a/src/test/subscription/t/004_sync.pl +++ b/src/test/subscription/t/004_sync.pl @@ -80,7 +80,7 @@ $node_subscriber->safe_psql('postgres', # wait for it to start $node_subscriber->poll_query_until('postgres', - "SELECT pid IS NOT NULL FROM pg_stat_subscription WHERE subname = 'tap_sub2' AND relid IS NULL" + "SELECT pid IS NOT NULL FROM pg_stat_subscription WHERE subname = 'tap_sub2' AND worker_type = 'apply'" ) or die "Timed out while waiting for subscriber to start"; # and drop both subscriptions -- 2.25.1