I figured I'd post what I have so far since this thread hasn't been updated
in a while.  The attached patches are still "proof-of-concept grade," but
they are at least moving in the right direction (IMHO).  The variable
naming is still not great, and they are woefully undercommented, among
other things.

0001 introduces a new API for registering callbacks and running them in
parallel on all databases in the cluster.  This new system manages a set of
"slots" that follow a simple state machine to asynchronously establish a
connection and run the queries.  It uses system() to wait for these
asynchronous tasks to complete.  Users of this API only need to provide two
callbacks: one to return the query that should be run on each database and
another to process the results of that query.  If multiple queries are
required for each database, users can provide multiple sets of callbacks.

The other patches change several of the existing tasks to use this new API.
With these patches applied, I see the following differences in the output
of 'pg_upgrade | ts -i' for a cluster with 1k empty databases:

        WITHOUT PATCH

        00:00:19 Checking database user is the install user                    
ok
        00:00:02 Checking for subscription state                               
ok
        00:00:06 Adding ".old" suffix to old global/pg_control                 
ok
        00:00:04 Checking for extension updates                                
ok

        WITH PATCHES (--jobs 1)

        00:00:10 Checking database user is the install user                    
ok
        00:00:02 Checking for subscription state                               
ok
        00:00:07 Adding ".old" suffix to old global/pg_control                 
ok
        00:00:05 Checking for extension updates                                
ok

        WITH PATCHES (--jobs 4)

        00:00:06 Checking database user is the install user                    
ok
        00:00:00 Checking for subscription state                               
ok
        00:00:02 Adding ".old" suffix to old global/pg_control                 
ok
        00:00:01 Checking for extension updates                                
ok

Note that the "Checking database user is the install user" time also
includes the call to get_db_rel_and_slot_infos() on the old cluster as well
as the call to get_loadable_libraries() on the old cluster.  I believe the
improvement with the patches with just one job is due to the consolidation
of the queries into one database connection (presently,
get_db_rel_and_slot_infos() creates 3 connections per database for some
upgrades).  Similarly, the "Adding \".old\" suffix to old
global/pg_control" time includes the call to get_db_rel_and_slot_infos() on
the new cluster.

There are several remaining places where we could use this new API to speed
up upgrades.  For example, I haven't attempted to use it for the data type
checks yet, and that tends to eat up a sizable chunk of time when there are
many databases.

On Thu, May 16, 2024 at 08:24:08PM -0500, Nathan Bossart wrote:
> On Thu, May 16, 2024 at 05:09:55PM -0700, Jeff Davis wrote:
>> Also, did you consider connecting once to each database and running
>> many queries? Most of those seem like just checks.
> 
> This was the idea behind 347758b.  It may be possible to do more along
> these lines.  IMO parallelizing will still be useful even if we do combine
> more of the steps.

My current thinking is that any possible further consolidation should
happen as part of a follow-up effort to parallelization.  I'm cautiously
optimistic that the parallelization work will make the consolidation easier
since it moves things to rigidly-defined callback functions.

A separate piece of off-list feedback from Michael Paquier is that this new
parallel system might be something we can teach the ParallelSlot code used
by bin/scripts/ to do.  I've yet to look too deeply into this, but I
suspect that it will be difficult to combine the two.  For example, the
ParallelSlot system doesn't seem well-suited for the kind of
run-once-in-each-database tasks required by pg_upgrade, and the error
handling is probably little different, too.  However, it's still worth a
closer look, and I'm interested in folks' opinions on the subject.

-- 
nathan
>From d7683a095d4d2c1574005eb41504a5be256d6480 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Fri, 28 Jun 2024 11:02:44 -0500
Subject: [PATCH v2 1/6] introduce framework for parallelizing pg_upgrade tasks

---
 src/bin/pg_upgrade/Makefile      |   1 +
 src/bin/pg_upgrade/async.c       | 323 +++++++++++++++++++++++++++++++
 src/bin/pg_upgrade/meson.build   |   1 +
 src/bin/pg_upgrade/pg_upgrade.h  |  16 ++
 src/tools/pgindent/typedefs.list |   4 +
 5 files changed, 345 insertions(+)
 create mode 100644 src/bin/pg_upgrade/async.c

diff --git a/src/bin/pg_upgrade/Makefile b/src/bin/pg_upgrade/Makefile
index bde91e2beb..3bc4f5d740 100644
--- a/src/bin/pg_upgrade/Makefile
+++ b/src/bin/pg_upgrade/Makefile
@@ -12,6 +12,7 @@ include $(top_builddir)/src/Makefile.global
 
 OBJS = \
        $(WIN32RES) \
+       async.o \
        check.o \
        controldata.o \
        dump.o \
diff --git a/src/bin/pg_upgrade/async.c b/src/bin/pg_upgrade/async.c
new file mode 100644
index 0000000000..7df1e7712d
--- /dev/null
+++ b/src/bin/pg_upgrade/async.c
@@ -0,0 +1,323 @@
+/*
+ * async.c
+ *
+ * parallelization via libpq's async APIs
+ *
+ * Copyright (c) 2024, PostgreSQL Global Development Group
+ * src/bin/pg_upgrade/async.c
+ */
+
+#include "postgres_fe.h"
+
+#include "common/connect.h"
+#include "fe_utils/string_utils.h"
+#include "pg_upgrade.h"
+
+static int     dbs_complete;
+static int     dbs_processing;
+
+typedef struct AsyncTaskCallbacks
+{
+       AsyncTaskGetQueryCB query_cb;
+       AsyncTaskProcessCB process_cb;
+       bool            free_result;
+       void       *arg;
+} AsyncTaskCallbacks;
+
+typedef struct AsyncTask
+{
+       AsyncTaskCallbacks *cbs;
+       int                     num_cb_sets;
+} AsyncTask;
+
+typedef enum
+{
+       FREE,
+       CONNECTING,
+       SETTING_SEARCH_PATH,
+       RUNNING_QUERY,
+} AsyncSlotState;
+
+typedef struct
+{
+       AsyncSlotState state;
+       int                     db;
+       int                     query;
+       PGconn     *conn;
+} AsyncSlot;
+
+AsyncTask *
+async_task_create(void)
+{
+       return pg_malloc0(sizeof(AsyncTask));
+}
+
+void
+async_task_free(AsyncTask *task)
+{
+       if (task->cbs)
+               pg_free(task->cbs);
+
+       pg_free(task);
+}
+
+void
+async_task_add_step(AsyncTask *task,
+                                       AsyncTaskGetQueryCB query_cb,
+                                       AsyncTaskProcessCB process_cb, bool 
free_result,
+                                       void *arg)
+{
+       AsyncTaskCallbacks *new_cbs;
+
+       task->cbs = pg_realloc(task->cbs,
+                                                  ++task->num_cb_sets * 
sizeof(AsyncTaskCallbacks));
+
+       new_cbs = &task->cbs[task->num_cb_sets - 1];
+       new_cbs->query_cb = query_cb;
+       new_cbs->process_cb = process_cb;
+       new_cbs->free_result = free_result;
+       new_cbs->arg = arg;
+}
+
+static void
+conn_failure(PGconn *conn)
+{
+       pg_log(PG_REPORT, "%s", PQerrorMessage(conn));
+       printf(_("Failure, exiting\n"));
+       exit(1);
+}
+
+static void
+start_conn(const ClusterInfo *cluster, AsyncSlot *slot)
+{
+       PQExpBufferData conn_opts;
+
+       /* Build connection string with proper quoting */
+       initPQExpBuffer(&conn_opts);
+       appendPQExpBufferStr(&conn_opts, "dbname=");
+       appendConnStrVal(&conn_opts, cluster->dbarr.dbs[slot->db].db_name);
+       appendPQExpBufferStr(&conn_opts, " user=");
+       appendConnStrVal(&conn_opts, os_info.user);
+       appendPQExpBuffer(&conn_opts, " port=%d", cluster->port);
+       if (cluster->sockdir)
+       {
+               appendPQExpBufferStr(&conn_opts, " host=");
+               appendConnStrVal(&conn_opts, cluster->sockdir);
+       }
+
+       slot->conn = PQconnectStart(conn_opts.data);
+       termPQExpBuffer(&conn_opts);
+
+       if (!slot->conn)
+               conn_failure(slot->conn);
+}
+
+static void
+dispatch_query(const ClusterInfo *cluster, AsyncSlot *slot,
+                          const AsyncTask *task)
+{
+       AsyncTaskCallbacks *cbs = &task->cbs[slot->query];
+       AsyncTaskGetQueryCB get_query = cbs->query_cb;
+       DbInfo     *dbinfo = &cluster->dbarr.dbs[slot->db];
+       char       *query = (*get_query) (dbinfo, cbs->arg);
+
+       if (!PQsendQuery(slot->conn, query))
+               conn_failure(slot->conn);
+
+       pg_free(query);
+}
+
+static PGresult *
+get_last_result(PGconn *conn)
+{
+       PGresult   *tmp;
+       PGresult   *res = NULL;
+
+       while ((tmp = PQgetResult(conn)) != NULL)
+       {
+               PQclear(res);
+               res = tmp;
+               if (PQstatus(conn) == CONNECTION_BAD)
+                       conn_failure(conn);
+       }
+
+       if (PQresultStatus(res) != PGRES_COMMAND_OK &&
+               PQresultStatus(res) != PGRES_TUPLES_OK)
+               conn_failure(conn);
+
+       return res;
+}
+
+static void
+process_query_result(const ClusterInfo *cluster, AsyncSlot *slot,
+                                        const AsyncTask *task)
+{
+       AsyncTaskCallbacks *cbs = &task->cbs[slot->query];
+       AsyncTaskProcessCB process_cb = cbs->process_cb;
+       DbInfo     *dbinfo = &cluster->dbarr.dbs[slot->db];
+       PGresult   *res = get_last_result(slot->conn);
+
+       (*process_cb) (dbinfo, res, cbs->arg);
+
+       if (cbs->free_result)
+               PQclear(res);
+}
+
+static void
+process_slot(const ClusterInfo *cluster, AsyncSlot *slot, const AsyncTask 
*task)
+{
+       switch (slot->state)
+       {
+               case FREE:
+                       if (dbs_processing >= cluster->dbarr.ndbs)
+                               return;
+                       slot->db = dbs_processing++;
+                       slot->state = CONNECTING;
+                       start_conn(cluster, slot);
+                       return;
+
+               case CONNECTING:
+                       if (PQconnectPoll(slot->conn) == PGRES_POLLING_FAILED)
+                               conn_failure(slot->conn);
+                       if (PQconnectPoll(slot->conn) != PGRES_POLLING_OK)
+                               return;
+                       slot->state = SETTING_SEARCH_PATH;
+                       if (!PQsendQuery(slot->conn, 
ALWAYS_SECURE_SEARCH_PATH_SQL))
+                               conn_failure(slot->conn);
+                       return;
+
+               case SETTING_SEARCH_PATH:
+                       if (!PQconsumeInput(slot->conn))
+                               conn_failure(slot->conn);
+                       if (PQisBusy(slot->conn))
+                               return;
+                       PQclear(get_last_result(slot->conn));
+                       slot->state = RUNNING_QUERY;
+                       dispatch_query(cluster, slot, task);
+                       return;
+
+               case RUNNING_QUERY:
+                       if (!PQconsumeInput(slot->conn))
+                               conn_failure(slot->conn);
+                       if (PQisBusy(slot->conn))
+                               return;
+                       process_query_result(cluster, slot, task);
+                       if (++slot->query >= task->num_cb_sets)
+                       {
+                               dbs_complete++;
+                               PQfinish(slot->conn);
+                               memset(slot, 0, sizeof(AsyncSlot));
+                               return;
+                       }
+                       dispatch_query(cluster, slot, task);
+                       return;
+       }
+}
+
+/*
+ * Wait on the slots to either finish connecting or to receive query results if
+ * possible.  This avoids a tight loop in async_task_run().
+ */
+static void
+wait_on_slots(AsyncSlot *slots, int numslots)
+{
+       fd_set          input_mask;
+       fd_set          output_mask;
+       fd_set          except_mask;
+       int                     maxFd = 0;
+
+       FD_ZERO(&input_mask);
+       FD_ZERO(&output_mask);
+       FD_ZERO(&except_mask);
+
+       for (int i = 0; i < numslots; i++)
+       {
+               int                     sock;
+               bool            read = false;
+
+               switch (slots[i].state)
+               {
+                       case FREE:
+
+                               /*
+                                * If we see a free slot, return right away so 
that it can be
+                                * reused immediately for the next database.  
This might cause
+                                * us to spin more than necessary as we finish 
processing the
+                                * last few databases, but that shouldn't cause 
too much harm.
+                                */
+                               return;
+
+                       case CONNECTING:
+
+                               /*
+                                * If we are waiting for the connection to 
establish, choose
+                                * whether to wait for reading or for writing 
on the socket as
+                                * appropriate.  If neither apply, just return 
immediately so
+                                * that we can handle the slot.
+                                */
+                               {
+                                       PostgresPollingStatusType status;
+
+                                       status = PQconnectPoll(slots[i].conn);
+                                       if (status == PGRES_POLLING_READING)
+                                               read = true;
+                                       else if (status != 
PGRES_POLLING_WRITING)
+                                               return;
+                               }
+                               break;
+
+                       case SETTING_SEARCH_PATH:
+                       case RUNNING_QUERY:
+
+                               /*
+                                * If we've sent a query, we must wait for the 
socket to be
+                                * read-ready.  Note that process_slot() 
handles calling
+                                * PQconsumeInput() as required.
+                                */
+                               read = true;
+                               break;
+               }
+
+               /*
+                * If there's some problem retrieving the socket, just pretend 
this
+                * slot doesn't exist.  We don't expect this to happen 
regularly in
+                * practice, so it seems unlikely to cause too much harm.
+                */
+               sock = PQsocket(slots[i].conn);
+               if (sock < 0)
+                       continue;
+
+               /*
+                * Add the socket to the set.
+                */
+               FD_SET(sock, read ? &input_mask : &output_mask);
+               FD_SET(sock, &except_mask);
+               maxFd = Max(maxFd, sock);
+       }
+
+       /*
+        * If we found socket(s) to wait on, wait.
+        */
+       if (maxFd != 0)
+               (void) select(maxFd + 1, &input_mask, &output_mask, 
&except_mask, NULL);
+}
+
+void
+async_task_run(const AsyncTask *task, const ClusterInfo *cluster)
+{
+       int                     jobs = Max(1, user_opts.jobs);
+       AsyncSlot  *slots = pg_malloc0(sizeof(AsyncSlot) * jobs);
+
+       dbs_complete = 0;
+       dbs_processing = 0;
+
+       while (dbs_complete < cluster->dbarr.ndbs)
+       {
+               for (int i = 0; i < jobs; i++)
+                       process_slot(cluster, &slots[i], task);
+
+               wait_on_slots(slots, jobs);
+       }
+
+       pg_free(slots);
+}
diff --git a/src/bin/pg_upgrade/meson.build b/src/bin/pg_upgrade/meson.build
index 9825fa3305..9eb48e176c 100644
--- a/src/bin/pg_upgrade/meson.build
+++ b/src/bin/pg_upgrade/meson.build
@@ -1,6 +1,7 @@
 # Copyright (c) 2022-2024, PostgreSQL Global Development Group
 
 pg_upgrade_sources = files(
+  'async.c',
   'check.c',
   'controldata.c',
   'dump.c',
diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h
index 8afe240bdf..1ebad3bd74 100644
--- a/src/bin/pg_upgrade/pg_upgrade.h
+++ b/src/bin/pg_upgrade/pg_upgrade.h
@@ -494,3 +494,19 @@ void               parallel_transfer_all_new_dbs(DbInfoArr 
*old_db_arr, DbInfoArr *new_db_arr
                                                                                
  char *old_pgdata, char *new_pgdata,
                                                                                
  char *old_tablespace);
 bool           reap_child(bool wait_for_child);
+
+/* async.c */
+
+typedef char *(*AsyncTaskGetQueryCB) (DbInfo *dbinfo, void *arg);
+typedef void (*AsyncTaskProcessCB) (DbInfo *dbinfo, PGresult *res, void *arg);
+
+/* struct definition is private to async.c */
+typedef struct AsyncTask AsyncTask;
+
+AsyncTask  *async_task_create(void);
+void           async_task_add_step(AsyncTask *task,
+                                                               
AsyncTaskGetQueryCB query_cb,
+                                                               
AsyncTaskProcessCB process_cb, bool free_result,
+                                                               void *arg);
+void           async_task_run(const AsyncTask *task, const ClusterInfo 
*cluster);
+void           async_task_free(AsyncTask *task);
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index e6c1caf649..3d219cbfe2 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -153,6 +153,10 @@ ArrayMetaState
 ArraySubWorkspace
 ArrayToken
 ArrayType
+AsyncSlot
+AsyncSlotState
+AsyncTask
+AsyncTaskCallbacks
 AsyncQueueControl
 AsyncQueueEntry
 AsyncRequest
-- 
2.39.3 (Apple Git-146)

>From c84b3c97cb0befff8027702f1674e809f174b3aa Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Fri, 28 Jun 2024 17:21:19 -0500
Subject: [PATCH v2 2/6] use new pg_upgrade async API for subscription state
 checks

---
 src/bin/pg_upgrade/check.c | 200 ++++++++++++++++++++-----------------
 1 file changed, 106 insertions(+), 94 deletions(-)

diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index 27924159d6..f653fa25a5 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -1906,6 +1906,75 @@ check_old_cluster_for_valid_slots(bool live_check)
        check_ok();
 }
 
+/* private state for subscription state checks */
+struct substate_info
+{
+       FILE       *script;
+       char            output_path[MAXPGPATH];
+};
+
+/*
+ * We don't allow upgrade if there is a risk of dangling slot or origin
+ * corresponding to initial sync after upgrade.
+ *
+ * A slot/origin not created yet refers to the 'i' (initialize) state, while
+ * 'r' (ready) state refers to a slot/origin created previously but already
+ * dropped. These states are supported for pg_upgrade. The other states listed
+ * below are not supported:
+ *
+ * a) SUBREL_STATE_DATASYNC: A relation upgraded while in this state would
+ * retain a replication slot, which could not be dropped by the sync worker
+ * spawned after the upgrade because the subscription ID used for the slot name
+ * won't match anymore.
+ *
+ * b) SUBREL_STATE_SYNCDONE: A relation upgraded while in this state would
+ * retain the replication origin when there is a failure in tablesync worker
+ * immediately after dropping the replication slot in the publisher.
+ *
+ * c) SUBREL_STATE_FINISHEDCOPY: A tablesync worker spawned to work on a
+ * relation upgraded while in this state would expect an origin ID with the OID
+ * of the subscription used before the upgrade, causing it to fail.
+ *
+ * d) SUBREL_STATE_SYNCWAIT, SUBREL_STATE_CATCHUP and SUBREL_STATE_UNKNOWN:
+ * These states are not stored in the catalog, so we need not allow these
+ * states.
+ */
+static char *
+sub_query(DbInfo *dbinfo, void *arg)
+{
+       return pg_strdup("SELECT r.srsubstate, s.subname, n.nspname, c.relname "
+                                        "FROM pg_catalog.pg_subscription_rel r 
"
+                                        "LEFT JOIN pg_catalog.pg_subscription 
s"
+                                        "   ON r.srsubid = s.oid "
+                                        "LEFT JOIN pg_catalog.pg_class c"
+                                        "   ON r.srrelid = c.oid "
+                                        "LEFT JOIN pg_catalog.pg_namespace n"
+                                        "   ON c.relnamespace = n.oid "
+                                        "WHERE r.srsubstate NOT IN ('i', 'r') "
+                                        "ORDER BY s.subname");
+}
+
+static void
+sub_process(DbInfo *dbinfo, PGresult *res, void *arg)
+{
+       struct substate_info *state = (struct substate_info *) arg;
+       int                     ntup = PQntuples(res);
+
+       for (int i = 0; i < ntup; i++)
+       {
+               if (state->script == NULL &&
+                       (state->script = fopen_priv(state->output_path, "w")) 
== NULL)
+                       pg_fatal("could not open file \"%s\": %m", 
state->output_path);
+
+               fprintf(state->script, "The table sync state \"%s\" is not 
allowed for database:\"%s\" subscription:\"%s\" schema:\"%s\" 
relation:\"%s\"\n",
+                               PQgetvalue(res, i, 0),
+                               dbinfo->db_name,
+                               PQgetvalue(res, i, 1),
+                               PQgetvalue(res, i, 2),
+                               PQgetvalue(res, i, 3));
+       }
+}
+
 /*
  * check_old_cluster_subscription_state()
  *
@@ -1916,115 +1985,58 @@ check_old_cluster_for_valid_slots(bool live_check)
 static void
 check_old_cluster_subscription_state(void)
 {
-       FILE       *script = NULL;
-       char            output_path[MAXPGPATH];
+       AsyncTask  *task = async_task_create();
+       struct substate_info state;
+       PGresult   *res;
+       PGconn     *conn;
        int                     ntup;
 
        prep_status("Checking for subscription state");
 
-       snprintf(output_path, sizeof(output_path), "%s/%s",
+       state.script = NULL;
+       snprintf(state.output_path, sizeof(state.output_path), "%s/%s",
                         log_opts.basedir,
                         "subs_invalid.txt");
-       for (int dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
-       {
-               PGresult   *res;
-               DbInfo     *active_db = &old_cluster.dbarr.dbs[dbnum];
-               PGconn     *conn = connectToServer(&old_cluster, 
active_db->db_name);
 
-               /* We need to check for pg_replication_origin only once. */
-               if (dbnum == 0)
-               {
-                       /*
-                        * Check that all the subscriptions have their 
respective
-                        * replication origin.
-                        */
-                       res = executeQueryOrDie(conn,
-                                                                       "SELECT 
d.datname, s.subname "
-                                                                       "FROM 
pg_catalog.pg_subscription s "
-                                                                       "LEFT 
OUTER JOIN pg_catalog.pg_replication_origin o "
-                                                                       "       
ON o.roname = 'pg_' || s.oid "
-                                                                       "INNER 
JOIN pg_catalog.pg_database d "
-                                                                       "       
ON d.oid = s.subdbid "
-                                                                       "WHERE 
o.roname IS NULL;");
-
-                       ntup = PQntuples(res);
-                       for (int i = 0; i < ntup; i++)
-                       {
-                               if (script == NULL && (script = 
fopen_priv(output_path, "w")) == NULL)
-                                       pg_fatal("could not open file \"%s\": 
%m", output_path);
-                               fprintf(script, "The replication origin is 
missing for database:\"%s\" subscription:\"%s\"\n",
-                                               PQgetvalue(res, i, 0),
-                                               PQgetvalue(res, i, 1));
-                       }
-                       PQclear(res);
-               }
-
-               /*
-                * We don't allow upgrade if there is a risk of dangling slot or
-                * origin corresponding to initial sync after upgrade.
-                *
-                * A slot/origin not created yet refers to the 'i' (initialize) 
state,
-                * while 'r' (ready) state refers to a slot/origin created 
previously
-                * but already dropped. These states are supported for 
pg_upgrade. The
-                * other states listed below are not supported:
-                *
-                * a) SUBREL_STATE_DATASYNC: A relation upgraded while in this 
state
-                * would retain a replication slot, which could not be dropped 
by the
-                * sync worker spawned after the upgrade because the 
subscription ID
-                * used for the slot name won't match anymore.
-                *
-                * b) SUBREL_STATE_SYNCDONE: A relation upgraded while in this 
state
-                * would retain the replication origin when there is a failure 
in
-                * tablesync worker immediately after dropping the replication 
slot in
-                * the publisher.
-                *
-                * c) SUBREL_STATE_FINISHEDCOPY: A tablesync worker spawned to 
work on
-                * a relation upgraded while in this state would expect an 
origin ID
-                * with the OID of the subscription used before the upgrade, 
causing
-                * it to fail.
-                *
-                * d) SUBREL_STATE_SYNCWAIT, SUBREL_STATE_CATCHUP and
-                * SUBREL_STATE_UNKNOWN: These states are not stored in the 
catalog,
-                * so we need not allow these states.
-                */
-               res = executeQueryOrDie(conn,
-                                                               "SELECT 
r.srsubstate, s.subname, n.nspname, c.relname "
-                                                               "FROM 
pg_catalog.pg_subscription_rel r "
-                                                               "LEFT JOIN 
pg_catalog.pg_subscription s"
-                                                               "       ON 
r.srsubid = s.oid "
-                                                               "LEFT JOIN 
pg_catalog.pg_class c"
-                                                               "       ON 
r.srrelid = c.oid "
-                                                               "LEFT JOIN 
pg_catalog.pg_namespace n"
-                                                               "       ON 
c.relnamespace = n.oid "
-                                                               "WHERE 
r.srsubstate NOT IN ('i', 'r') "
-                                                               "ORDER BY 
s.subname");
-
-               ntup = PQntuples(res);
-               for (int i = 0; i < ntup; i++)
-               {
-                       if (script == NULL && (script = fopen_priv(output_path, 
"w")) == NULL)
-                               pg_fatal("could not open file \"%s\": %m", 
output_path);
+       /*
+        * Check that all the subscriptions have their respective replication
+        * origin.  This check only needs to run once.
+        */
+       conn = connectToServer(&old_cluster, old_cluster.dbarr.dbs[0].db_name);
+       res = executeQueryOrDie(conn,
+                                                       "SELECT d.datname, 
s.subname "
+                                                       "FROM 
pg_catalog.pg_subscription s "
+                                                       "LEFT OUTER JOIN 
pg_catalog.pg_replication_origin o "
+                                                       "       ON o.roname = 
'pg_' || s.oid "
+                                                       "INNER JOIN 
pg_catalog.pg_database d "
+                                                       "       ON d.oid = 
s.subdbid "
+                                                       "WHERE o.roname IS 
NULL;");
+       ntup = PQntuples(res);
+       for (int i = 0; i < ntup; i++)
+       {
+               if (state.script == NULL &&
+                       (state.script = fopen_priv(state.output_path, "w")) == 
NULL)
+                       pg_fatal("could not open file \"%s\": %m", 
state.output_path);
+               fprintf(state.script, "The replication origin is missing for 
database:\"%s\" subscription:\"%s\"\n",
+                               PQgetvalue(res, i, 0),
+                               PQgetvalue(res, i, 1));
+       }
+       PQclear(res);
+       PQfinish(conn);
 
-                       fprintf(script, "The table sync state \"%s\" is not 
allowed for database:\"%s\" subscription:\"%s\" schema:\"%s\" 
relation:\"%s\"\n",
-                                       PQgetvalue(res, i, 0),
-                                       active_db->db_name,
-                                       PQgetvalue(res, i, 1),
-                                       PQgetvalue(res, i, 2),
-                                       PQgetvalue(res, i, 3));
-               }
+       async_task_add_step(task, sub_query, sub_process, true, &state);
 
-               PQclear(res);
-               PQfinish(conn);
-       }
+       async_task_run(task, &old_cluster);
+       async_task_free(task);
 
-       if (script)
+       if (state.script)
        {
-               fclose(script);
+               fclose(state.script);
                pg_log(PG_REPORT, "fatal");
                pg_fatal("Your installation contains subscriptions without 
origin or having relations not in i (initialize) or r (ready) state.\n"
                                 "You can allow the initial sync to finish for 
all relations and then restart the upgrade.\n"
                                 "A list of the problematic subscriptions is in 
the file:\n"
-                                "    %s", output_path);
+                                "    %s", state.output_path);
        }
        else
                check_ok();
-- 
2.39.3 (Apple Git-146)

>From c9d2c483ac1fabc0897c19a60a1cf6054e1293da Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Tue, 21 May 2024 16:35:19 -0500
Subject: [PATCH v2 3/6] move live_check variable to user_opts

---
 src/bin/pg_upgrade/check.c       | 32 ++++++++++++++++----------------
 src/bin/pg_upgrade/controldata.c |  5 +++--
 src/bin/pg_upgrade/info.c        | 12 +++++-------
 src/bin/pg_upgrade/option.c      |  4 ++--
 src/bin/pg_upgrade/pg_upgrade.c  | 21 ++++++++++-----------
 src/bin/pg_upgrade/pg_upgrade.h  | 13 +++++++------
 6 files changed, 43 insertions(+), 44 deletions(-)

diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index f653fa25a5..251f3d9017 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -29,7 +29,7 @@ static void check_for_new_tablespace_dir(void);
 static void check_for_user_defined_encoding_conversions(ClusterInfo *cluster);
 static void check_new_cluster_logical_replication_slots(void);
 static void check_new_cluster_subscription_configuration(void);
-static void check_old_cluster_for_valid_slots(bool live_check);
+static void check_old_cluster_for_valid_slots(void);
 static void check_old_cluster_subscription_state(void);
 
 /*
@@ -555,9 +555,9 @@ fix_path_separator(char *path)
 }
 
 void
-output_check_banner(bool live_check)
+output_check_banner(void)
 {
-       if (user_opts.check && live_check)
+       if (user_opts.live_check)
        {
                pg_log(PG_REPORT,
                           "Performing Consistency Checks on Old Live Server\n"
@@ -573,18 +573,18 @@ output_check_banner(bool live_check)
 
 
 void
-check_and_dump_old_cluster(bool live_check)
+check_and_dump_old_cluster(void)
 {
        /* -- OLD -- */
 
-       if (!live_check)
+       if (!user_opts.live_check)
                start_postmaster(&old_cluster, true);
 
        /*
         * Extract a list of databases, tables, and logical replication slots 
from
         * the old cluster.
         */
-       get_db_rel_and_slot_infos(&old_cluster, live_check);
+       get_db_rel_and_slot_infos(&old_cluster);
 
        init_tablespaces();
 
@@ -605,7 +605,7 @@ check_and_dump_old_cluster(bool live_check)
                 * Logical replication slots can be migrated since PG17. See 
comments
                 * atop get_old_cluster_logical_slot_infos().
                 */
-               check_old_cluster_for_valid_slots(live_check);
+               check_old_cluster_for_valid_slots();
 
                /*
                 * Subscriptions and their dependencies can be migrated since 
PG17.
@@ -652,7 +652,7 @@ check_and_dump_old_cluster(bool live_check)
         */
        if (GET_MAJOR_VERSION(old_cluster.major_version) <= 906)
        {
-               if (user_opts.check)
+               if (user_opts.live_check)
                        old_9_6_invalidate_hash_indexes(&old_cluster, true);
        }
 
@@ -667,7 +667,7 @@ check_and_dump_old_cluster(bool live_check)
        if (!user_opts.check)
                generate_old_dump();
 
-       if (!live_check)
+       if (!user_opts.live_check)
                stop_postmaster(false);
 }
 
@@ -675,7 +675,7 @@ check_and_dump_old_cluster(bool live_check)
 void
 check_new_cluster(void)
 {
-       get_db_rel_and_slot_infos(&new_cluster, false);
+       get_db_rel_and_slot_infos(&new_cluster);
 
        check_new_cluster_is_empty();
 
@@ -826,14 +826,14 @@ check_cluster_versions(void)
 
 
 void
-check_cluster_compatibility(bool live_check)
+check_cluster_compatibility(void)
 {
        /* get/check pg_control data of servers */
-       get_control_data(&old_cluster, live_check);
-       get_control_data(&new_cluster, false);
+       get_control_data(&old_cluster);
+       get_control_data(&new_cluster);
        check_control_data(&old_cluster.controldata, &new_cluster.controldata);
 
-       if (live_check && old_cluster.port == new_cluster.port)
+       if (user_opts.live_check && old_cluster.port == new_cluster.port)
                pg_fatal("When checking a live server, "
                                 "the old and new port numbers must be 
different.");
 }
@@ -1839,7 +1839,7 @@ check_new_cluster_subscription_configuration(void)
  * before shutdown.
  */
 static void
-check_old_cluster_for_valid_slots(bool live_check)
+check_old_cluster_for_valid_slots(void)
 {
        char            output_path[MAXPGPATH];
        FILE       *script = NULL;
@@ -1878,7 +1878,7 @@ check_old_cluster_for_valid_slots(bool live_check)
                         * Note: This can be satisfied only when the old 
cluster has been
                         * shut down, so we skip this for live checks.
                         */
-                       if (!live_check && !slot->caught_up)
+                       if (!user_opts.live_check && !slot->caught_up)
                        {
                                if (script == NULL &&
                                        (script = fopen_priv(output_path, "w")) 
== NULL)
diff --git a/src/bin/pg_upgrade/controldata.c b/src/bin/pg_upgrade/controldata.c
index 1f0ccea3ed..cf665b9dee 100644
--- a/src/bin/pg_upgrade/controldata.c
+++ b/src/bin/pg_upgrade/controldata.c
@@ -33,7 +33,7 @@
  * return valid xid data for a running server.
  */
 void
-get_control_data(ClusterInfo *cluster, bool live_check)
+get_control_data(ClusterInfo *cluster)
 {
        char            cmd[MAXPGPATH];
        char            bufin[MAX_STRING];
@@ -76,6 +76,7 @@ get_control_data(ClusterInfo *cluster, bool live_check)
        uint32          segno = 0;
        char       *resetwal_bin;
        int                     rc;
+       bool            live_check = (cluster == &old_cluster && 
user_opts.live_check);
 
        /*
         * Because we test the pg_resetwal output as strings, it has to be in
@@ -118,7 +119,7 @@ get_control_data(ClusterInfo *cluster, bool live_check)
        /*
         * Check for clean shutdown
         */
-       if (!live_check || cluster == &new_cluster)
+       if (!live_check)
        {
                /* only pg_controldata outputs the cluster state */
                snprintf(cmd, sizeof(cmd), "\"%s/pg_controldata\" \"%s\"",
diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c
index 95c22a7200..8f1777de59 100644
--- a/src/bin/pg_upgrade/info.c
+++ b/src/bin/pg_upgrade/info.c
@@ -27,7 +27,7 @@ static void free_rel_infos(RelInfoArr *rel_arr);
 static void print_db_infos(DbInfoArr *db_arr);
 static void print_rel_infos(RelInfoArr *rel_arr);
 static void print_slot_infos(LogicalSlotInfoArr *slot_arr);
-static void get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool 
live_check);
+static void get_old_cluster_logical_slot_infos(DbInfo *dbinfo);
 static void get_db_subscription_count(DbInfo *dbinfo);
 
 
@@ -273,11 +273,9 @@ report_unmatched_relation(const RelInfo *rel, const DbInfo 
*db, bool is_new_db)
  *
  * higher level routine to generate dbinfos for the database running
  * on the given "port". Assumes that server is already running.
- *
- * live_check would be used only when the target is the old cluster.
  */
 void
-get_db_rel_and_slot_infos(ClusterInfo *cluster, bool live_check)
+get_db_rel_and_slot_infos(ClusterInfo *cluster)
 {
        int                     dbnum;
 
@@ -299,7 +297,7 @@ get_db_rel_and_slot_infos(ClusterInfo *cluster, bool 
live_check)
                 */
                if (cluster == &old_cluster)
                {
-                       get_old_cluster_logical_slot_infos(pDbInfo, live_check);
+                       get_old_cluster_logical_slot_infos(pDbInfo);
                        get_db_subscription_count(pDbInfo);
                }
        }
@@ -645,7 +643,7 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
  * are included.
  */
 static void
-get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check)
+get_old_cluster_logical_slot_infos(DbInfo *dbinfo)
 {
        PGconn     *conn;
        PGresult   *res;
@@ -681,7 +679,7 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool 
live_check)
                                                        "WHERE slot_type = 
'logical' AND "
                                                        "database = 
current_database() AND "
                                                        "temporary IS FALSE;",
-                                                       live_check ? "FALSE" :
+                                                       user_opts.live_check ? 
"FALSE" :
                                                        "(CASE WHEN 
invalidation_reason IS NOT NULL THEN FALSE "
                                                        "ELSE (SELECT 
pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name)) "
                                                        "END)");
diff --git a/src/bin/pg_upgrade/option.c b/src/bin/pg_upgrade/option.c
index 548ea4e623..6f41d63eed 100644
--- a/src/bin/pg_upgrade/option.c
+++ b/src/bin/pg_upgrade/option.c
@@ -470,10 +470,10 @@ adjust_data_dir(ClusterInfo *cluster)
  * directory.
  */
 void
-get_sock_dir(ClusterInfo *cluster, bool live_check)
+get_sock_dir(ClusterInfo *cluster)
 {
 #if !defined(WIN32)
-       if (!live_check)
+       if (!user_opts.live_check || cluster == &new_cluster)
                cluster->sockdir = user_opts.socketdir;
        else
        {
diff --git a/src/bin/pg_upgrade/pg_upgrade.c b/src/bin/pg_upgrade/pg_upgrade.c
index af370768b6..3f4ad7d5cc 100644
--- a/src/bin/pg_upgrade/pg_upgrade.c
+++ b/src/bin/pg_upgrade/pg_upgrade.c
@@ -65,7 +65,7 @@ static void create_new_objects(void);
 static void copy_xact_xlog_xid(void);
 static void set_frozenxids(bool minmxid_only);
 static void make_outputdirs(char *pgdata);
-static void setup(char *argv0, bool *live_check);
+static void setup(char *argv0);
 static void create_logical_replication_slots(void);
 
 ClusterInfo old_cluster,
@@ -88,7 +88,6 @@ int
 main(int argc, char **argv)
 {
        char       *deletion_script_file_name = NULL;
-       bool            live_check = false;
 
        /*
         * pg_upgrade doesn't currently use common/logging.c, but initialize it
@@ -123,18 +122,18 @@ main(int argc, char **argv)
         */
        make_outputdirs(new_cluster.pgdata);
 
-       setup(argv[0], &live_check);
+       setup(argv[0]);
 
-       output_check_banner(live_check);
+       output_check_banner();
 
        check_cluster_versions();
 
-       get_sock_dir(&old_cluster, live_check);
-       get_sock_dir(&new_cluster, false);
+       get_sock_dir(&old_cluster);
+       get_sock_dir(&new_cluster);
 
-       check_cluster_compatibility(live_check);
+       check_cluster_compatibility();
 
-       check_and_dump_old_cluster(live_check);
+       check_and_dump_old_cluster();
 
 
        /* -- NEW -- */
@@ -331,7 +330,7 @@ make_outputdirs(char *pgdata)
 
 
 static void
-setup(char *argv0, bool *live_check)
+setup(char *argv0)
 {
        /*
         * make sure the user has a clean environment, otherwise, we may confuse
@@ -378,7 +377,7 @@ setup(char *argv0, bool *live_check)
                                pg_fatal("There seems to be a postmaster 
servicing the old cluster.\n"
                                                 "Please shutdown that 
postmaster and try again.");
                        else
-                               *live_check = true;
+                               user_opts.live_check = true;
                }
        }
 
@@ -648,7 +647,7 @@ create_new_objects(void)
                set_frozenxids(true);
 
        /* update new_cluster info now that we have objects in the databases */
-       get_db_rel_and_slot_infos(&new_cluster, false);
+       get_db_rel_and_slot_infos(&new_cluster);
 }
 
 /*
diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h
index 1ebad3bd74..56d05d7eb9 100644
--- a/src/bin/pg_upgrade/pg_upgrade.h
+++ b/src/bin/pg_upgrade/pg_upgrade.h
@@ -322,6 +322,7 @@ typedef struct
 typedef struct
 {
        bool            check;                  /* check clusters only, don't 
change any data */
+       bool            live_check;             /* check clusters only, old 
server is running */
        bool            do_sync;                /* flush changes to disk */
        transferMode transfer_mode; /* copy files or link them? */
        int                     jobs;                   /* number of 
processes/threads to use */
@@ -366,20 +367,20 @@ extern OSInfo os_info;
 
 /* check.c */
 
-void           output_check_banner(bool live_check);
-void           check_and_dump_old_cluster(bool live_check);
+void           output_check_banner(void);
+void           check_and_dump_old_cluster(void);
 void           check_new_cluster(void);
 void           report_clusters_compatible(void);
 void           issue_warnings_and_set_wal_level(void);
 void           output_completion_banner(char *deletion_script_file_name);
 void           check_cluster_versions(void);
-void           check_cluster_compatibility(bool live_check);
+void           check_cluster_compatibility(void);
 void           create_script_for_old_cluster_deletion(char 
**deletion_script_file_name);
 
 
 /* controldata.c */
 
-void           get_control_data(ClusterInfo *cluster, bool live_check);
+void           get_control_data(ClusterInfo *cluster);
 void           check_control_data(ControlData *oldctrl, ControlData *newctrl);
 void           disable_old_cluster(void);
 
@@ -428,7 +429,7 @@ void                check_loadable_libraries(void);
 FileNameMap *gen_db_file_maps(DbInfo *old_db,
                                                          DbInfo *new_db, int 
*nmaps, const char *old_pgdata,
                                                          const char 
*new_pgdata);
-void           get_db_rel_and_slot_infos(ClusterInfo *cluster, bool 
live_check);
+void           get_db_rel_and_slot_infos(ClusterInfo *cluster);
 int                    count_old_cluster_logical_slots(void);
 int                    count_old_cluster_subscriptions(void);
 
@@ -436,7 +437,7 @@ int                 count_old_cluster_subscriptions(void);
 
 void           parseCommandLine(int argc, char *argv[]);
 void           adjust_data_dir(ClusterInfo *cluster);
-void           get_sock_dir(ClusterInfo *cluster, bool live_check);
+void           get_sock_dir(ClusterInfo *cluster);
 
 /* relfilenumber.c */
 
-- 
2.39.3 (Apple Git-146)

>From 48943ad85f83ba44ea01e4b1fdd5c4afc53552e3 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Fri, 28 Jun 2024 21:09:33 -0500
Subject: [PATCH v2 4/6] use new pg_upgrade async API for retrieving relinfos

---
 src/bin/pg_upgrade/info.c | 187 +++++++++++++++++---------------------
 1 file changed, 81 insertions(+), 106 deletions(-)

diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c
index 8f1777de59..d07255bd0a 100644
--- a/src/bin/pg_upgrade/info.c
+++ b/src/bin/pg_upgrade/info.c
@@ -22,13 +22,16 @@ static void report_unmatched_relation(const RelInfo *rel, 
const DbInfo *db,
 static void free_db_and_rel_infos(DbInfoArr *db_arr);
 static void get_template0_info(ClusterInfo *cluster);
 static void get_db_infos(ClusterInfo *cluster);
-static void get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo);
+static char *get_rel_infos_query(DbInfo *dbinfo, void *arg);
+static void get_rel_infos_result(DbInfo *dbinfo, PGresult *res, void *arg);
 static void free_rel_infos(RelInfoArr *rel_arr);
 static void print_db_infos(DbInfoArr *db_arr);
 static void print_rel_infos(RelInfoArr *rel_arr);
 static void print_slot_infos(LogicalSlotInfoArr *slot_arr);
-static void get_old_cluster_logical_slot_infos(DbInfo *dbinfo);
-static void get_db_subscription_count(DbInfo *dbinfo);
+static char *get_old_cluster_logical_slot_infos_query(DbInfo *dbinfo, void 
*arg);
+static void get_old_cluster_logical_slot_infos_result(DbInfo *dbinfo, PGresult 
*res, void *arg);
+static char *get_db_subscription_count_query(DbInfo *dbinfo, void *arg);
+static void get_db_subscription_count_result(DbInfo *dbinfo, PGresult *res, 
void *arg);
 
 
 /*
@@ -277,7 +280,7 @@ report_unmatched_relation(const RelInfo *rel, const DbInfo 
*db, bool is_new_db)
 void
 get_db_rel_and_slot_infos(ClusterInfo *cluster)
 {
-       int                     dbnum;
+       AsyncTask  *task = async_task_create();
 
        if (cluster->dbarr.dbs != NULL)
                free_db_and_rel_infos(&cluster->dbarr);
@@ -285,23 +288,26 @@ get_db_rel_and_slot_infos(ClusterInfo *cluster)
        get_template0_info(cluster);
        get_db_infos(cluster);
 
-       for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
+       async_task_add_step(task,
+                                               get_rel_infos_query,
+                                               get_rel_infos_result,
+                                               true, NULL);
+       if (cluster == &old_cluster &&
+               GET_MAJOR_VERSION(cluster->major_version) > 1600)
        {
-               DbInfo     *pDbInfo = &cluster->dbarr.dbs[dbnum];
-
-               get_rel_infos(cluster, pDbInfo);
-
-               /*
-                * Retrieve the logical replication slots infos and the 
subscriptions
-                * count for the old cluster.
-                */
-               if (cluster == &old_cluster)
-               {
-                       get_old_cluster_logical_slot_infos(pDbInfo);
-                       get_db_subscription_count(pDbInfo);
-               }
+               async_task_add_step(task,
+                                                       
get_old_cluster_logical_slot_infos_query,
+                                                       
get_old_cluster_logical_slot_infos_result,
+                                                       true, cluster);
+               async_task_add_step(task,
+                                                       
get_db_subscription_count_query,
+                                                       
get_db_subscription_count_result,
+                                                       true, cluster);
        }
 
+       async_task_run(task, cluster);
+       async_task_free(task);
+
        if (cluster == &old_cluster)
                pg_log(PG_VERBOSE, "\nsource databases:");
        else
@@ -447,30 +453,10 @@ get_db_infos(ClusterInfo *cluster)
  * Note: the resulting RelInfo array is assumed to be sorted by OID.
  * This allows later processing to match up old and new databases efficiently.
  */
-static void
-get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
+static char *
+get_rel_infos_query(DbInfo *dbinfo, void *arg)
 {
-       PGconn     *conn = connectToServer(cluster,
-                                                                          
dbinfo->db_name);
-       PGresult   *res;
-       RelInfo    *relinfos;
-       int                     ntups;
-       int                     relnum;
-       int                     num_rels = 0;
-       char       *nspname = NULL;
-       char       *relname = NULL;
-       char       *tablespace = NULL;
-       int                     i_spclocation,
-                               i_nspname,
-                               i_relname,
-                               i_reloid,
-                               i_indtable,
-                               i_toastheap,
-                               i_relfilenumber,
-                               i_reltablespace;
-       char            query[QUERY_ALLOC];
-       char       *last_namespace = NULL,
-                          *last_tablespace = NULL;
+       char       *query = pg_malloc(QUERY_ALLOC);
 
        query[0] = '\0';                        /* initialize query string to 
empty */
 
@@ -484,7 +470,7 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
         * output, so we have to copy that system table.  It's easiest to do 
that
         * by treating it as a user table.
         */
-       snprintf(query + strlen(query), sizeof(query) - strlen(query),
+       snprintf(query + strlen(query), QUERY_ALLOC - strlen(query),
                         "WITH regular_heap (reloid, indtable, toastheap) AS ( "
                         "  SELECT c.oid, 0::oid, 0::oid "
                         "  FROM pg_catalog.pg_class c JOIN 
pg_catalog.pg_namespace n "
@@ -506,7 +492,7 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
         * selected by the regular_heap CTE.  (We have to do this separately
         * because the namespace-name rules above don't work for toast tables.)
         */
-       snprintf(query + strlen(query), sizeof(query) - strlen(query),
+       snprintf(query + strlen(query), QUERY_ALLOC - strlen(query),
                         "  toast_heap (reloid, indtable, toastheap) AS ( "
                         "  SELECT c.reltoastrelid, 0::oid, c.oid "
                         "  FROM regular_heap JOIN pg_catalog.pg_class c "
@@ -519,7 +505,7 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
         * Testing indisready is necessary in 9.2, and harmless in earlier/later
         * versions.
         */
-       snprintf(query + strlen(query), sizeof(query) - strlen(query),
+       snprintf(query + strlen(query), QUERY_ALLOC - strlen(query),
                         "  all_index (reloid, indtable, toastheap) AS ( "
                         "  SELECT indexrelid, indrelid, 0::oid "
                         "  FROM pg_catalog.pg_index "
@@ -533,7 +519,7 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
         * And now we can write the query that retrieves the data we want for 
each
         * heap and index relation.  Make sure result is sorted by OID.
         */
-       snprintf(query + strlen(query), sizeof(query) - strlen(query),
+       snprintf(query + strlen(query), QUERY_ALLOC - strlen(query),
                         "SELECT all_rels.*, n.nspname, c.relname, "
                         "  c.relfilenode, c.reltablespace, "
                         "  pg_catalog.pg_tablespace_location(t.oid) AS 
spclocation "
@@ -550,22 +536,30 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
                         "     ON c.reltablespace = t.oid "
                         "ORDER BY 1;");
 
-       res = executeQueryOrDie(conn, "%s", query);
-
-       ntups = PQntuples(res);
-
-       relinfos = (RelInfo *) pg_malloc(sizeof(RelInfo) * ntups);
+       return query;
+}
 
-       i_reloid = PQfnumber(res, "reloid");
-       i_indtable = PQfnumber(res, "indtable");
-       i_toastheap = PQfnumber(res, "toastheap");
-       i_nspname = PQfnumber(res, "nspname");
-       i_relname = PQfnumber(res, "relname");
-       i_relfilenumber = PQfnumber(res, "relfilenode");
-       i_reltablespace = PQfnumber(res, "reltablespace");
-       i_spclocation = PQfnumber(res, "spclocation");
+static void
+get_rel_infos_result(DbInfo *dbinfo, PGresult *res, void *arg)
+{
+       int                     ntups = PQntuples(res);
+       RelInfo    *relinfos = (RelInfo *) pg_malloc(sizeof(RelInfo) * ntups);
+       int                     i_reloid = PQfnumber(res, "reloid");
+       int                     i_indtable = PQfnumber(res, "indtable");
+       int                     i_toastheap = PQfnumber(res, "toastheap");
+       int                     i_nspname = PQfnumber(res, "nspname");
+       int                     i_relname = PQfnumber(res, "relname");
+       int                     i_relfilenumber = PQfnumber(res, "relfilenode");
+       int                     i_reltablespace = PQfnumber(res, 
"reltablespace");
+       int                     i_spclocation = PQfnumber(res, "spclocation");
+       int                     num_rels = 0;
+       char       *nspname = NULL;
+       char       *relname = NULL;
+       char       *tablespace = NULL;
+       char       *last_namespace = NULL;
+       char       *last_tablespace = NULL;
 
-       for (relnum = 0; relnum < ntups; relnum++)
+       for (int relnum = 0; relnum < ntups; relnum++)
        {
                RelInfo    *curr = &relinfos[num_rels++];
 
@@ -618,9 +612,6 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
                        /* A zero reltablespace oid indicates the database 
tablespace. */
                        curr->tablespace = dbinfo->db_tablespace;
        }
-       PQclear(res);
-
-       PQfinish(conn);
 
        dbinfo->rel_arr.rels = relinfos;
        dbinfo->rel_arr.nrels = num_rels;
@@ -642,20 +633,9 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
  * check_old_cluster_for_valid_slots() would raise a FATAL error if such slots
  * are included.
  */
-static void
-get_old_cluster_logical_slot_infos(DbInfo *dbinfo)
+static char *
+get_old_cluster_logical_slot_infos_query(DbInfo *dbinfo, void *arg)
 {
-       PGconn     *conn;
-       PGresult   *res;
-       LogicalSlotInfo *slotinfos = NULL;
-       int                     num_slots;
-
-       /* Logical slots can be migrated since PG17. */
-       if (GET_MAJOR_VERSION(old_cluster.major_version) <= 1600)
-               return;
-
-       conn = connectToServer(&old_cluster, dbinfo->db_name);
-
        /*
         * Fetch the logical replication slot information. The check whether the
         * slot is considered caught up is done by an upgrade function. This
@@ -673,18 +653,23 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo)
         * started and stopped several times causing any temporary slots to be
         * removed.
         */
-       res = executeQueryOrDie(conn, "SELECT slot_name, plugin, two_phase, 
failover, "
-                                                       "%s as caught_up, 
invalidation_reason IS NOT NULL as invalid "
-                                                       "FROM 
pg_catalog.pg_replication_slots "
-                                                       "WHERE slot_type = 
'logical' AND "
-                                                       "database = 
current_database() AND "
-                                                       "temporary IS FALSE;",
-                                                       user_opts.live_check ? 
"FALSE" :
-                                                       "(CASE WHEN 
invalidation_reason IS NOT NULL THEN FALSE "
-                                                       "ELSE (SELECT 
pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name)) "
-                                                       "END)");
-
-       num_slots = PQntuples(res);
+       return psprintf("SELECT slot_name, plugin, two_phase, failover, "
+                                       "%s as caught_up, invalidation_reason 
IS NOT NULL as invalid "
+                                       "FROM pg_catalog.pg_replication_slots "
+                                       "WHERE slot_type = 'logical' AND "
+                                       "database = current_database() AND "
+                                       "temporary IS FALSE;",
+                                       user_opts.live_check ? "FALSE" :
+                                       "(CASE WHEN invalidation_reason IS NOT 
NULL THEN FALSE "
+                                       "ELSE (SELECT 
pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name)) "
+                                       "END)");
+}
+
+static void
+get_old_cluster_logical_slot_infos_result(DbInfo *dbinfo, PGresult *res, void 
*arg)
+{
+       LogicalSlotInfo *slotinfos = NULL;
+       int                     num_slots = PQntuples(res);
 
        if (num_slots)
        {
@@ -717,14 +702,10 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo)
                }
        }
 
-       PQclear(res);
-       PQfinish(conn);
-
        dbinfo->slot_arr.slots = slotinfos;
        dbinfo->slot_arr.nslots = num_slots;
 }
 
-
 /*
  * count_old_cluster_logical_slots()
  *
@@ -754,24 +735,18 @@ count_old_cluster_logical_slots(void)
  * This is because before that the logical slots are not upgraded, so we will
  * not be able to upgrade the logical replication clusters completely.
  */
-static void
-get_db_subscription_count(DbInfo *dbinfo)
+static char *
+get_db_subscription_count_query(DbInfo *dbinfo, void *arg)
 {
-       PGconn     *conn;
-       PGresult   *res;
-
-       /* Subscriptions can be migrated since PG17. */
-       if (GET_MAJOR_VERSION(old_cluster.major_version) < 1700)
-               return;
+       return psprintf("SELECT count(*) "
+                                       "FROM pg_catalog.pg_subscription WHERE 
subdbid = %u",
+                                       dbinfo->db_oid);
+}
 
-       conn = connectToServer(&old_cluster, dbinfo->db_name);
-       res = executeQueryOrDie(conn, "SELECT count(*) "
-                                                       "FROM 
pg_catalog.pg_subscription WHERE subdbid = %u",
-                                                       dbinfo->db_oid);
+static void
+get_db_subscription_count_result(DbInfo *dbinfo, PGresult *res, void *arg)
+{
        dbinfo->nsubs = atoi(PQgetvalue(res, 0, 0));
-
-       PQclear(res);
-       PQfinish(conn);
 }
 
 /*
-- 
2.39.3 (Apple Git-146)

>From 09e7e7baa8c277a3afbed1e2f8d05bfa7fcc586c Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Fri, 28 Jun 2024 21:24:35 -0500
Subject: [PATCH v2 5/6] use new pg_upgrade async API to parallelize getting
 loadable libraries

---
 src/bin/pg_upgrade/function.c | 63 ++++++++++++++++++++---------------
 1 file changed, 37 insertions(+), 26 deletions(-)

diff --git a/src/bin/pg_upgrade/function.c b/src/bin/pg_upgrade/function.c
index 7e3abed098..c11fce0696 100644
--- a/src/bin/pg_upgrade/function.c
+++ b/src/bin/pg_upgrade/function.c
@@ -42,6 +42,32 @@ library_name_compare(const void *p1, const void *p2)
                                          ((const LibraryInfo *) p2)->dbnum);
 }
 
+struct loadable_libraries_state
+{
+       PGresult  **ress;
+       int                     totaltups;
+};
+
+static char *
+get_loadable_libraries_query(DbInfo *dbinfo, void *arg)
+{
+       return psprintf("SELECT DISTINCT probin "
+                                       "FROM pg_catalog.pg_proc "
+                                       "WHERE prolang = %u AND "
+                                       "probin IS NOT NULL AND "
+                                       "oid >= %u;",
+                                       ClanguageId,
+                                       FirstNormalObjectId);
+}
+
+static void
+get_loadable_libraries_result(DbInfo *dbinfo, PGresult *res, void *arg)
+{
+       struct loadable_libraries_state *state = (struct 
loadable_libraries_state *) arg;
+
+       state->ress[dbinfo - old_cluster.dbarr.dbs] = res;
+       state->totaltups += PQntuples(res);
+}
 
 /*
  * get_loadable_libraries()
@@ -54,47 +80,32 @@ library_name_compare(const void *p1, const void *p2)
 void
 get_loadable_libraries(void)
 {
-       PGresult  **ress;
        int                     totaltups;
        int                     dbnum;
        int                     n_libinfos;
+       AsyncTask  *task = async_task_create();
+       struct loadable_libraries_state state;
 
-       ress = (PGresult **) pg_malloc(old_cluster.dbarr.ndbs * sizeof(PGresult 
*));
-       totaltups = 0;
+       state.ress = (PGresult **) pg_malloc(old_cluster.dbarr.ndbs * 
sizeof(PGresult *));
+       state.totaltups = 0;
 
-       /* Fetch all library names, removing duplicates within each DB */
-       for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
-       {
-               DbInfo     *active_db = &old_cluster.dbarr.dbs[dbnum];
-               PGconn     *conn = connectToServer(&old_cluster, 
active_db->db_name);
+       async_task_add_step(task, get_loadable_libraries_query,
+                                               get_loadable_libraries_result, 
false, &state);
 
-               /*
-                * Fetch all libraries containing non-built-in C functions in 
this DB.
-                */
-               ress[dbnum] = executeQueryOrDie(conn,
-                                                                               
"SELECT DISTINCT probin "
-                                                                               
"FROM pg_catalog.pg_proc "
-                                                                               
"WHERE prolang = %u AND "
-                                                                               
"probin IS NOT NULL AND "
-                                                                               
"oid >= %u;",
-                                                                               
ClanguageId,
-                                                                               
FirstNormalObjectId);
-               totaltups += PQntuples(ress[dbnum]);
-
-               PQfinish(conn);
-       }
+       async_task_run(task, &old_cluster);
+       async_task_free(task);
 
        /*
         * Allocate memory for required libraries and logical replication output
         * plugins.
         */
-       n_libinfos = totaltups + count_old_cluster_logical_slots();
+       n_libinfos = state.totaltups + count_old_cluster_logical_slots();
        os_info.libraries = (LibraryInfo *) pg_malloc(sizeof(LibraryInfo) * 
n_libinfos);
        totaltups = 0;
 
        for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
        {
-               PGresult   *res = ress[dbnum];
+               PGresult   *res = state.ress[dbnum];
                int                     ntups;
                int                     rowno;
                LogicalSlotInfoArr *slot_arr = 
&old_cluster.dbarr.dbs[dbnum].slot_arr;
@@ -129,7 +140,7 @@ get_loadable_libraries(void)
                }
        }
 
-       pg_free(ress);
+       pg_free(state.ress);
 
        os_info.num_libraries = totaltups;
 }
-- 
2.39.3 (Apple Git-146)

>From 7a420ff039d48c54cbb4d06647f039257a807bb9 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Fri, 28 Jun 2024 21:31:57 -0500
Subject: [PATCH v2 6/6] use new pg_upgrade async API to parallelize reporting
 extension updates

---
 src/bin/pg_upgrade/version.c | 82 ++++++++++++++++++------------------
 1 file changed, 41 insertions(+), 41 deletions(-)

diff --git a/src/bin/pg_upgrade/version.c b/src/bin/pg_upgrade/version.c
index 2de6dffccd..12783bb2ba 100644
--- a/src/bin/pg_upgrade/version.c
+++ b/src/bin/pg_upgrade/version.c
@@ -139,6 +139,42 @@ old_9_6_invalidate_hash_indexes(ClusterInfo *cluster, bool 
check_mode)
                check_ok();
 }
 
+static char *
+report_extension_updates_query(DbInfo *dbinfo, void *arg)
+{
+       return pg_strdup("SELECT name "
+                                        "FROM pg_available_extensions "
+                                        "WHERE installed_version != 
default_version");
+}
+
+static void
+report_extension_updates_result(DbInfo *dbinfo, PGresult *res, void *arg)
+{
+       bool            db_used = false;
+       int                     ntups = PQntuples(res);
+       int                     i_name = PQfnumber(res, "name");
+       char       *output_path = "update_extensions.sql";
+       FILE      **script = (FILE **) arg;
+
+       for (int rowno = 0; rowno < ntups; rowno++)
+       {
+               if (*script == NULL && (*script = fopen_priv(output_path, "w")) 
== NULL)
+                       pg_fatal("could not open file \"%s\": %m", output_path);
+               if (!db_used)
+               {
+                       PQExpBufferData connectbuf;
+
+                       initPQExpBuffer(&connectbuf);
+                       appendPsqlMetaConnect(&connectbuf, dbinfo->db_name);
+                       fputs(connectbuf.data, *script);
+                       termPQExpBuffer(&connectbuf);
+                       db_used = true;
+               }
+               fprintf(*script, "ALTER EXTENSION %s UPDATE;\n",
+                               quote_identifier(PQgetvalue(res, rowno, 
i_name)));
+       }
+}
+
 /*
  * report_extension_updates()
  *     Report extensions that should be updated.
@@ -146,53 +182,17 @@ old_9_6_invalidate_hash_indexes(ClusterInfo *cluster, 
bool check_mode)
 void
 report_extension_updates(ClusterInfo *cluster)
 {
-       int                     dbnum;
        FILE       *script = NULL;
        char       *output_path = "update_extensions.sql";
+       AsyncTask  *task = async_task_create();
 
        prep_status("Checking for extension updates");
 
-       for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
-       {
-               PGresult   *res;
-               bool            db_used = false;
-               int                     ntups;
-               int                     rowno;
-               int                     i_name;
-               DbInfo     *active_db = &cluster->dbarr.dbs[dbnum];
-               PGconn     *conn = connectToServer(cluster, active_db->db_name);
-
-               /* find extensions needing updates */
-               res = executeQueryOrDie(conn,
-                                                               "SELECT name "
-                                                               "FROM 
pg_available_extensions "
-                                                               "WHERE 
installed_version != default_version"
-                       );
-
-               ntups = PQntuples(res);
-               i_name = PQfnumber(res, "name");
-               for (rowno = 0; rowno < ntups; rowno++)
-               {
-                       if (script == NULL && (script = fopen_priv(output_path, 
"w")) == NULL)
-                               pg_fatal("could not open file \"%s\": %m", 
output_path);
-                       if (!db_used)
-                       {
-                               PQExpBufferData connectbuf;
-
-                               initPQExpBuffer(&connectbuf);
-                               appendPsqlMetaConnect(&connectbuf, 
active_db->db_name);
-                               fputs(connectbuf.data, script);
-                               termPQExpBuffer(&connectbuf);
-                               db_used = true;
-                       }
-                       fprintf(script, "ALTER EXTENSION %s UPDATE;\n",
-                                       quote_identifier(PQgetvalue(res, rowno, 
i_name)));
-               }
+       async_task_add_step(task, report_extension_updates_query,
+                                               
report_extension_updates_result, true, &script);
 
-               PQclear(res);
-
-               PQfinish(conn);
-       }
+       async_task_run(task, cluster);
+       async_task_free(task);
 
        if (script)
        {
-- 
2.39.3 (Apple Git-146)

Reply via email to