I finished parallelizing everything in pg_upgrade I could easily
parallelize with the proposed async task API.  There are a few remaining
places where I could not easily convert to the new API for whatever reason.
AFAICT those remaining places are either not showing up prominently in my
tests, or they only affect versions that are unsupported or will soon be
unsupported when v18 is released.  Similarly, I noticed many of the checks
follow a very similar coding pattern, but most (if not all) only affect
older versions, so I'm unsure whether it's worth the trouble to try
consolidating them into one scan through all the databases.

The code is still very rough and nowhere near committable, but this at
least gets the patch set into the editing phase.

-- 
nathan
>From a4110826fd703e100da7ca8419e4d0609b4346eb Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Fri, 28 Jun 2024 11:02:44 -0500
Subject: [PATCH v4 01/12] introduce framework for parallelizing pg_upgrade
 tasks

---
 src/bin/pg_upgrade/Makefile      |   1 +
 src/bin/pg_upgrade/async.c       | 323 +++++++++++++++++++++++++++++++
 src/bin/pg_upgrade/meson.build   |   1 +
 src/bin/pg_upgrade/pg_upgrade.h  |  16 ++
 src/tools/pgindent/typedefs.list |   4 +
 5 files changed, 345 insertions(+)
 create mode 100644 src/bin/pg_upgrade/async.c

diff --git a/src/bin/pg_upgrade/Makefile b/src/bin/pg_upgrade/Makefile
index bde91e2beb..3bc4f5d740 100644
--- a/src/bin/pg_upgrade/Makefile
+++ b/src/bin/pg_upgrade/Makefile
@@ -12,6 +12,7 @@ include $(top_builddir)/src/Makefile.global
 
 OBJS = \
        $(WIN32RES) \
+       async.o \
        check.o \
        controldata.o \
        dump.o \
diff --git a/src/bin/pg_upgrade/async.c b/src/bin/pg_upgrade/async.c
new file mode 100644
index 0000000000..7df1e7712d
--- /dev/null
+++ b/src/bin/pg_upgrade/async.c
@@ -0,0 +1,323 @@
+/*
+ * async.c
+ *
+ * parallelization via libpq's async APIs
+ *
+ * Copyright (c) 2024, PostgreSQL Global Development Group
+ * src/bin/pg_upgrade/async.c
+ */
+
+#include "postgres_fe.h"
+
+#include "common/connect.h"
+#include "fe_utils/string_utils.h"
+#include "pg_upgrade.h"
+
+static int     dbs_complete;
+static int     dbs_processing;
+
+typedef struct AsyncTaskCallbacks
+{
+       AsyncTaskGetQueryCB query_cb;
+       AsyncTaskProcessCB process_cb;
+       bool            free_result;
+       void       *arg;
+} AsyncTaskCallbacks;
+
+typedef struct AsyncTask
+{
+       AsyncTaskCallbacks *cbs;
+       int                     num_cb_sets;
+} AsyncTask;
+
+typedef enum
+{
+       FREE,
+       CONNECTING,
+       SETTING_SEARCH_PATH,
+       RUNNING_QUERY,
+} AsyncSlotState;
+
+typedef struct
+{
+       AsyncSlotState state;
+       int                     db;
+       int                     query;
+       PGconn     *conn;
+} AsyncSlot;
+
+AsyncTask *
+async_task_create(void)
+{
+       return pg_malloc0(sizeof(AsyncTask));
+}
+
+void
+async_task_free(AsyncTask *task)
+{
+       if (task->cbs)
+               pg_free(task->cbs);
+
+       pg_free(task);
+}
+
+void
+async_task_add_step(AsyncTask *task,
+                                       AsyncTaskGetQueryCB query_cb,
+                                       AsyncTaskProcessCB process_cb, bool 
free_result,
+                                       void *arg)
+{
+       AsyncTaskCallbacks *new_cbs;
+
+       task->cbs = pg_realloc(task->cbs,
+                                                  ++task->num_cb_sets * 
sizeof(AsyncTaskCallbacks));
+
+       new_cbs = &task->cbs[task->num_cb_sets - 1];
+       new_cbs->query_cb = query_cb;
+       new_cbs->process_cb = process_cb;
+       new_cbs->free_result = free_result;
+       new_cbs->arg = arg;
+}
+
+static void
+conn_failure(PGconn *conn)
+{
+       pg_log(PG_REPORT, "%s", PQerrorMessage(conn));
+       printf(_("Failure, exiting\n"));
+       exit(1);
+}
+
+static void
+start_conn(const ClusterInfo *cluster, AsyncSlot *slot)
+{
+       PQExpBufferData conn_opts;
+
+       /* Build connection string with proper quoting */
+       initPQExpBuffer(&conn_opts);
+       appendPQExpBufferStr(&conn_opts, "dbname=");
+       appendConnStrVal(&conn_opts, cluster->dbarr.dbs[slot->db].db_name);
+       appendPQExpBufferStr(&conn_opts, " user=");
+       appendConnStrVal(&conn_opts, os_info.user);
+       appendPQExpBuffer(&conn_opts, " port=%d", cluster->port);
+       if (cluster->sockdir)
+       {
+               appendPQExpBufferStr(&conn_opts, " host=");
+               appendConnStrVal(&conn_opts, cluster->sockdir);
+       }
+
+       slot->conn = PQconnectStart(conn_opts.data);
+       termPQExpBuffer(&conn_opts);
+
+       if (!slot->conn)
+               conn_failure(slot->conn);
+}
+
+static void
+dispatch_query(const ClusterInfo *cluster, AsyncSlot *slot,
+                          const AsyncTask *task)
+{
+       AsyncTaskCallbacks *cbs = &task->cbs[slot->query];
+       AsyncTaskGetQueryCB get_query = cbs->query_cb;
+       DbInfo     *dbinfo = &cluster->dbarr.dbs[slot->db];
+       char       *query = (*get_query) (dbinfo, cbs->arg);
+
+       if (!PQsendQuery(slot->conn, query))
+               conn_failure(slot->conn);
+
+       pg_free(query);
+}
+
+static PGresult *
+get_last_result(PGconn *conn)
+{
+       PGresult   *tmp;
+       PGresult   *res = NULL;
+
+       while ((tmp = PQgetResult(conn)) != NULL)
+       {
+               PQclear(res);
+               res = tmp;
+               if (PQstatus(conn) == CONNECTION_BAD)
+                       conn_failure(conn);
+       }
+
+       if (PQresultStatus(res) != PGRES_COMMAND_OK &&
+               PQresultStatus(res) != PGRES_TUPLES_OK)
+               conn_failure(conn);
+
+       return res;
+}
+
+static void
+process_query_result(const ClusterInfo *cluster, AsyncSlot *slot,
+                                        const AsyncTask *task)
+{
+       AsyncTaskCallbacks *cbs = &task->cbs[slot->query];
+       AsyncTaskProcessCB process_cb = cbs->process_cb;
+       DbInfo     *dbinfo = &cluster->dbarr.dbs[slot->db];
+       PGresult   *res = get_last_result(slot->conn);
+
+       (*process_cb) (dbinfo, res, cbs->arg);
+
+       if (cbs->free_result)
+               PQclear(res);
+}
+
+static void
+process_slot(const ClusterInfo *cluster, AsyncSlot *slot, const AsyncTask 
*task)
+{
+       switch (slot->state)
+       {
+               case FREE:
+                       if (dbs_processing >= cluster->dbarr.ndbs)
+                               return;
+                       slot->db = dbs_processing++;
+                       slot->state = CONNECTING;
+                       start_conn(cluster, slot);
+                       return;
+
+               case CONNECTING:
+                       if (PQconnectPoll(slot->conn) == PGRES_POLLING_FAILED)
+                               conn_failure(slot->conn);
+                       if (PQconnectPoll(slot->conn) != PGRES_POLLING_OK)
+                               return;
+                       slot->state = SETTING_SEARCH_PATH;
+                       if (!PQsendQuery(slot->conn, 
ALWAYS_SECURE_SEARCH_PATH_SQL))
+                               conn_failure(slot->conn);
+                       return;
+
+               case SETTING_SEARCH_PATH:
+                       if (!PQconsumeInput(slot->conn))
+                               conn_failure(slot->conn);
+                       if (PQisBusy(slot->conn))
+                               return;
+                       PQclear(get_last_result(slot->conn));
+                       slot->state = RUNNING_QUERY;
+                       dispatch_query(cluster, slot, task);
+                       return;
+
+               case RUNNING_QUERY:
+                       if (!PQconsumeInput(slot->conn))
+                               conn_failure(slot->conn);
+                       if (PQisBusy(slot->conn))
+                               return;
+                       process_query_result(cluster, slot, task);
+                       if (++slot->query >= task->num_cb_sets)
+                       {
+                               dbs_complete++;
+                               PQfinish(slot->conn);
+                               memset(slot, 0, sizeof(AsyncSlot));
+                               return;
+                       }
+                       dispatch_query(cluster, slot, task);
+                       return;
+       }
+}
+
+/*
+ * Wait on the slots to either finish connecting or to receive query results if
+ * possible.  This avoids a tight loop in async_task_run().
+ */
+static void
+wait_on_slots(AsyncSlot *slots, int numslots)
+{
+       fd_set          input_mask;
+       fd_set          output_mask;
+       fd_set          except_mask;
+       int                     maxFd = 0;
+
+       FD_ZERO(&input_mask);
+       FD_ZERO(&output_mask);
+       FD_ZERO(&except_mask);
+
+       for (int i = 0; i < numslots; i++)
+       {
+               int                     sock;
+               bool            read = false;
+
+               switch (slots[i].state)
+               {
+                       case FREE:
+
+                               /*
+                                * If we see a free slot, return right away so 
that it can be
+                                * reused immediately for the next database.  
This might cause
+                                * us to spin more than necessary as we finish 
processing the
+                                * last few databases, but that shouldn't cause 
too much harm.
+                                */
+                               return;
+
+                       case CONNECTING:
+
+                               /*
+                                * If we are waiting for the connection to 
establish, choose
+                                * whether to wait for reading or for writing 
on the socket as
+                                * appropriate.  If neither apply, just return 
immediately so
+                                * that we can handle the slot.
+                                */
+                               {
+                                       PostgresPollingStatusType status;
+
+                                       status = PQconnectPoll(slots[i].conn);
+                                       if (status == PGRES_POLLING_READING)
+                                               read = true;
+                                       else if (status != 
PGRES_POLLING_WRITING)
+                                               return;
+                               }
+                               break;
+
+                       case SETTING_SEARCH_PATH:
+                       case RUNNING_QUERY:
+
+                               /*
+                                * If we've sent a query, we must wait for the 
socket to be
+                                * read-ready.  Note that process_slot() 
handles calling
+                                * PQconsumeInput() as required.
+                                */
+                               read = true;
+                               break;
+               }
+
+               /*
+                * If there's some problem retrieving the socket, just pretend 
this
+                * slot doesn't exist.  We don't expect this to happen 
regularly in
+                * practice, so it seems unlikely to cause too much harm.
+                */
+               sock = PQsocket(slots[i].conn);
+               if (sock < 0)
+                       continue;
+
+               /*
+                * Add the socket to the set.
+                */
+               FD_SET(sock, read ? &input_mask : &output_mask);
+               FD_SET(sock, &except_mask);
+               maxFd = Max(maxFd, sock);
+       }
+
+       /*
+        * If we found socket(s) to wait on, wait.
+        */
+       if (maxFd != 0)
+               (void) select(maxFd + 1, &input_mask, &output_mask, 
&except_mask, NULL);
+}
+
+void
+async_task_run(const AsyncTask *task, const ClusterInfo *cluster)
+{
+       int                     jobs = Max(1, user_opts.jobs);
+       AsyncSlot  *slots = pg_malloc0(sizeof(AsyncSlot) * jobs);
+
+       dbs_complete = 0;
+       dbs_processing = 0;
+
+       while (dbs_complete < cluster->dbarr.ndbs)
+       {
+               for (int i = 0; i < jobs; i++)
+                       process_slot(cluster, &slots[i], task);
+
+               wait_on_slots(slots, jobs);
+       }
+
+       pg_free(slots);
+}
diff --git a/src/bin/pg_upgrade/meson.build b/src/bin/pg_upgrade/meson.build
index 9825fa3305..9eb48e176c 100644
--- a/src/bin/pg_upgrade/meson.build
+++ b/src/bin/pg_upgrade/meson.build
@@ -1,6 +1,7 @@
 # Copyright (c) 2022-2024, PostgreSQL Global Development Group
 
 pg_upgrade_sources = files(
+  'async.c',
   'check.c',
   'controldata.c',
   'dump.c',
diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h
index 8afe240bdf..1ebad3bd74 100644
--- a/src/bin/pg_upgrade/pg_upgrade.h
+++ b/src/bin/pg_upgrade/pg_upgrade.h
@@ -494,3 +494,19 @@ void               parallel_transfer_all_new_dbs(DbInfoArr 
*old_db_arr, DbInfoArr *new_db_arr
                                                                                
  char *old_pgdata, char *new_pgdata,
                                                                                
  char *old_tablespace);
 bool           reap_child(bool wait_for_child);
+
+/* async.c */
+
+typedef char *(*AsyncTaskGetQueryCB) (DbInfo *dbinfo, void *arg);
+typedef void (*AsyncTaskProcessCB) (DbInfo *dbinfo, PGresult *res, void *arg);
+
+/* struct definition is private to async.c */
+typedef struct AsyncTask AsyncTask;
+
+AsyncTask  *async_task_create(void);
+void           async_task_add_step(AsyncTask *task,
+                                                               
AsyncTaskGetQueryCB query_cb,
+                                                               
AsyncTaskProcessCB process_cb, bool free_result,
+                                                               void *arg);
+void           async_task_run(const AsyncTask *task, const ClusterInfo 
*cluster);
+void           async_task_free(AsyncTask *task);
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 635e6d6e21..bbd8827c2d 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -153,6 +153,10 @@ ArrayMetaState
 ArraySubWorkspace
 ArrayToken
 ArrayType
+AsyncSlot
+AsyncSlotState
+AsyncTask
+AsyncTaskCallbacks
 AsyncQueueControl
 AsyncQueueEntry
 AsyncRequest
-- 
2.39.3 (Apple Git-146)

>From 7997a6c15c6b348fb0c0941ddf99c90364cfb21c Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Fri, 28 Jun 2024 17:21:19 -0500
Subject: [PATCH v4 02/12] use new pg_upgrade async API for subscription state
 checks

---
 src/bin/pg_upgrade/check.c | 200 ++++++++++++++++++++-----------------
 1 file changed, 106 insertions(+), 94 deletions(-)

diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index 27924159d6..f653fa25a5 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -1906,6 +1906,75 @@ check_old_cluster_for_valid_slots(bool live_check)
        check_ok();
 }
 
+/* private state for subscription state checks */
+struct substate_info
+{
+       FILE       *script;
+       char            output_path[MAXPGPATH];
+};
+
+/*
+ * We don't allow upgrade if there is a risk of dangling slot or origin
+ * corresponding to initial sync after upgrade.
+ *
+ * A slot/origin not created yet refers to the 'i' (initialize) state, while
+ * 'r' (ready) state refers to a slot/origin created previously but already
+ * dropped. These states are supported for pg_upgrade. The other states listed
+ * below are not supported:
+ *
+ * a) SUBREL_STATE_DATASYNC: A relation upgraded while in this state would
+ * retain a replication slot, which could not be dropped by the sync worker
+ * spawned after the upgrade because the subscription ID used for the slot name
+ * won't match anymore.
+ *
+ * b) SUBREL_STATE_SYNCDONE: A relation upgraded while in this state would
+ * retain the replication origin when there is a failure in tablesync worker
+ * immediately after dropping the replication slot in the publisher.
+ *
+ * c) SUBREL_STATE_FINISHEDCOPY: A tablesync worker spawned to work on a
+ * relation upgraded while in this state would expect an origin ID with the OID
+ * of the subscription used before the upgrade, causing it to fail.
+ *
+ * d) SUBREL_STATE_SYNCWAIT, SUBREL_STATE_CATCHUP and SUBREL_STATE_UNKNOWN:
+ * These states are not stored in the catalog, so we need not allow these
+ * states.
+ */
+static char *
+sub_query(DbInfo *dbinfo, void *arg)
+{
+       return pg_strdup("SELECT r.srsubstate, s.subname, n.nspname, c.relname "
+                                        "FROM pg_catalog.pg_subscription_rel r 
"
+                                        "LEFT JOIN pg_catalog.pg_subscription 
s"
+                                        "   ON r.srsubid = s.oid "
+                                        "LEFT JOIN pg_catalog.pg_class c"
+                                        "   ON r.srrelid = c.oid "
+                                        "LEFT JOIN pg_catalog.pg_namespace n"
+                                        "   ON c.relnamespace = n.oid "
+                                        "WHERE r.srsubstate NOT IN ('i', 'r') "
+                                        "ORDER BY s.subname");
+}
+
+static void
+sub_process(DbInfo *dbinfo, PGresult *res, void *arg)
+{
+       struct substate_info *state = (struct substate_info *) arg;
+       int                     ntup = PQntuples(res);
+
+       for (int i = 0; i < ntup; i++)
+       {
+               if (state->script == NULL &&
+                       (state->script = fopen_priv(state->output_path, "w")) 
== NULL)
+                       pg_fatal("could not open file \"%s\": %m", 
state->output_path);
+
+               fprintf(state->script, "The table sync state \"%s\" is not 
allowed for database:\"%s\" subscription:\"%s\" schema:\"%s\" 
relation:\"%s\"\n",
+                               PQgetvalue(res, i, 0),
+                               dbinfo->db_name,
+                               PQgetvalue(res, i, 1),
+                               PQgetvalue(res, i, 2),
+                               PQgetvalue(res, i, 3));
+       }
+}
+
 /*
  * check_old_cluster_subscription_state()
  *
@@ -1916,115 +1985,58 @@ check_old_cluster_for_valid_slots(bool live_check)
 static void
 check_old_cluster_subscription_state(void)
 {
-       FILE       *script = NULL;
-       char            output_path[MAXPGPATH];
+       AsyncTask  *task = async_task_create();
+       struct substate_info state;
+       PGresult   *res;
+       PGconn     *conn;
        int                     ntup;
 
        prep_status("Checking for subscription state");
 
-       snprintf(output_path, sizeof(output_path), "%s/%s",
+       state.script = NULL;
+       snprintf(state.output_path, sizeof(state.output_path), "%s/%s",
                         log_opts.basedir,
                         "subs_invalid.txt");
-       for (int dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
-       {
-               PGresult   *res;
-               DbInfo     *active_db = &old_cluster.dbarr.dbs[dbnum];
-               PGconn     *conn = connectToServer(&old_cluster, 
active_db->db_name);
 
-               /* We need to check for pg_replication_origin only once. */
-               if (dbnum == 0)
-               {
-                       /*
-                        * Check that all the subscriptions have their 
respective
-                        * replication origin.
-                        */
-                       res = executeQueryOrDie(conn,
-                                                                       "SELECT 
d.datname, s.subname "
-                                                                       "FROM 
pg_catalog.pg_subscription s "
-                                                                       "LEFT 
OUTER JOIN pg_catalog.pg_replication_origin o "
-                                                                       "       
ON o.roname = 'pg_' || s.oid "
-                                                                       "INNER 
JOIN pg_catalog.pg_database d "
-                                                                       "       
ON d.oid = s.subdbid "
-                                                                       "WHERE 
o.roname IS NULL;");
-
-                       ntup = PQntuples(res);
-                       for (int i = 0; i < ntup; i++)
-                       {
-                               if (script == NULL && (script = 
fopen_priv(output_path, "w")) == NULL)
-                                       pg_fatal("could not open file \"%s\": 
%m", output_path);
-                               fprintf(script, "The replication origin is 
missing for database:\"%s\" subscription:\"%s\"\n",
-                                               PQgetvalue(res, i, 0),
-                                               PQgetvalue(res, i, 1));
-                       }
-                       PQclear(res);
-               }
-
-               /*
-                * We don't allow upgrade if there is a risk of dangling slot or
-                * origin corresponding to initial sync after upgrade.
-                *
-                * A slot/origin not created yet refers to the 'i' (initialize) 
state,
-                * while 'r' (ready) state refers to a slot/origin created 
previously
-                * but already dropped. These states are supported for 
pg_upgrade. The
-                * other states listed below are not supported:
-                *
-                * a) SUBREL_STATE_DATASYNC: A relation upgraded while in this 
state
-                * would retain a replication slot, which could not be dropped 
by the
-                * sync worker spawned after the upgrade because the 
subscription ID
-                * used for the slot name won't match anymore.
-                *
-                * b) SUBREL_STATE_SYNCDONE: A relation upgraded while in this 
state
-                * would retain the replication origin when there is a failure 
in
-                * tablesync worker immediately after dropping the replication 
slot in
-                * the publisher.
-                *
-                * c) SUBREL_STATE_FINISHEDCOPY: A tablesync worker spawned to 
work on
-                * a relation upgraded while in this state would expect an 
origin ID
-                * with the OID of the subscription used before the upgrade, 
causing
-                * it to fail.
-                *
-                * d) SUBREL_STATE_SYNCWAIT, SUBREL_STATE_CATCHUP and
-                * SUBREL_STATE_UNKNOWN: These states are not stored in the 
catalog,
-                * so we need not allow these states.
-                */
-               res = executeQueryOrDie(conn,
-                                                               "SELECT 
r.srsubstate, s.subname, n.nspname, c.relname "
-                                                               "FROM 
pg_catalog.pg_subscription_rel r "
-                                                               "LEFT JOIN 
pg_catalog.pg_subscription s"
-                                                               "       ON 
r.srsubid = s.oid "
-                                                               "LEFT JOIN 
pg_catalog.pg_class c"
-                                                               "       ON 
r.srrelid = c.oid "
-                                                               "LEFT JOIN 
pg_catalog.pg_namespace n"
-                                                               "       ON 
c.relnamespace = n.oid "
-                                                               "WHERE 
r.srsubstate NOT IN ('i', 'r') "
-                                                               "ORDER BY 
s.subname");
-
-               ntup = PQntuples(res);
-               for (int i = 0; i < ntup; i++)
-               {
-                       if (script == NULL && (script = fopen_priv(output_path, 
"w")) == NULL)
-                               pg_fatal("could not open file \"%s\": %m", 
output_path);
+       /*
+        * Check that all the subscriptions have their respective replication
+        * origin.  This check only needs to run once.
+        */
+       conn = connectToServer(&old_cluster, old_cluster.dbarr.dbs[0].db_name);
+       res = executeQueryOrDie(conn,
+                                                       "SELECT d.datname, 
s.subname "
+                                                       "FROM 
pg_catalog.pg_subscription s "
+                                                       "LEFT OUTER JOIN 
pg_catalog.pg_replication_origin o "
+                                                       "       ON o.roname = 
'pg_' || s.oid "
+                                                       "INNER JOIN 
pg_catalog.pg_database d "
+                                                       "       ON d.oid = 
s.subdbid "
+                                                       "WHERE o.roname IS 
NULL;");
+       ntup = PQntuples(res);
+       for (int i = 0; i < ntup; i++)
+       {
+               if (state.script == NULL &&
+                       (state.script = fopen_priv(state.output_path, "w")) == 
NULL)
+                       pg_fatal("could not open file \"%s\": %m", 
state.output_path);
+               fprintf(state.script, "The replication origin is missing for 
database:\"%s\" subscription:\"%s\"\n",
+                               PQgetvalue(res, i, 0),
+                               PQgetvalue(res, i, 1));
+       }
+       PQclear(res);
+       PQfinish(conn);
 
-                       fprintf(script, "The table sync state \"%s\" is not 
allowed for database:\"%s\" subscription:\"%s\" schema:\"%s\" 
relation:\"%s\"\n",
-                                       PQgetvalue(res, i, 0),
-                                       active_db->db_name,
-                                       PQgetvalue(res, i, 1),
-                                       PQgetvalue(res, i, 2),
-                                       PQgetvalue(res, i, 3));
-               }
+       async_task_add_step(task, sub_query, sub_process, true, &state);
 
-               PQclear(res);
-               PQfinish(conn);
-       }
+       async_task_run(task, &old_cluster);
+       async_task_free(task);
 
-       if (script)
+       if (state.script)
        {
-               fclose(script);
+               fclose(state.script);
                pg_log(PG_REPORT, "fatal");
                pg_fatal("Your installation contains subscriptions without 
origin or having relations not in i (initialize) or r (ready) state.\n"
                                 "You can allow the initial sync to finish for 
all relations and then restart the upgrade.\n"
                                 "A list of the problematic subscriptions is in 
the file:\n"
-                                "    %s", output_path);
+                                "    %s", state.output_path);
        }
        else
                check_ok();
-- 
2.39.3 (Apple Git-146)

>From 9616fb82ef2a77a4c13b943118bb776d0f1a5a1c Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Tue, 21 May 2024 16:35:19 -0500
Subject: [PATCH v4 03/12] move live_check variable to user_opts

---
 src/bin/pg_upgrade/check.c       | 32 ++++++++++++++++----------------
 src/bin/pg_upgrade/controldata.c |  5 +++--
 src/bin/pg_upgrade/info.c        | 12 +++++-------
 src/bin/pg_upgrade/option.c      |  4 ++--
 src/bin/pg_upgrade/pg_upgrade.c  | 21 ++++++++++-----------
 src/bin/pg_upgrade/pg_upgrade.h  | 13 +++++++------
 6 files changed, 43 insertions(+), 44 deletions(-)

diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index f653fa25a5..251f3d9017 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -29,7 +29,7 @@ static void check_for_new_tablespace_dir(void);
 static void check_for_user_defined_encoding_conversions(ClusterInfo *cluster);
 static void check_new_cluster_logical_replication_slots(void);
 static void check_new_cluster_subscription_configuration(void);
-static void check_old_cluster_for_valid_slots(bool live_check);
+static void check_old_cluster_for_valid_slots(void);
 static void check_old_cluster_subscription_state(void);
 
 /*
@@ -555,9 +555,9 @@ fix_path_separator(char *path)
 }
 
 void
-output_check_banner(bool live_check)
+output_check_banner(void)
 {
-       if (user_opts.check && live_check)
+       if (user_opts.live_check)
        {
                pg_log(PG_REPORT,
                           "Performing Consistency Checks on Old Live Server\n"
@@ -573,18 +573,18 @@ output_check_banner(bool live_check)
 
 
 void
-check_and_dump_old_cluster(bool live_check)
+check_and_dump_old_cluster(void)
 {
        /* -- OLD -- */
 
-       if (!live_check)
+       if (!user_opts.live_check)
                start_postmaster(&old_cluster, true);
 
        /*
         * Extract a list of databases, tables, and logical replication slots 
from
         * the old cluster.
         */
-       get_db_rel_and_slot_infos(&old_cluster, live_check);
+       get_db_rel_and_slot_infos(&old_cluster);
 
        init_tablespaces();
 
@@ -605,7 +605,7 @@ check_and_dump_old_cluster(bool live_check)
                 * Logical replication slots can be migrated since PG17. See 
comments
                 * atop get_old_cluster_logical_slot_infos().
                 */
-               check_old_cluster_for_valid_slots(live_check);
+               check_old_cluster_for_valid_slots();
 
                /*
                 * Subscriptions and their dependencies can be migrated since 
PG17.
@@ -652,7 +652,7 @@ check_and_dump_old_cluster(bool live_check)
         */
        if (GET_MAJOR_VERSION(old_cluster.major_version) <= 906)
        {
-               if (user_opts.check)
+               if (user_opts.live_check)
                        old_9_6_invalidate_hash_indexes(&old_cluster, true);
        }
 
@@ -667,7 +667,7 @@ check_and_dump_old_cluster(bool live_check)
        if (!user_opts.check)
                generate_old_dump();
 
-       if (!live_check)
+       if (!user_opts.live_check)
                stop_postmaster(false);
 }
 
@@ -675,7 +675,7 @@ check_and_dump_old_cluster(bool live_check)
 void
 check_new_cluster(void)
 {
-       get_db_rel_and_slot_infos(&new_cluster, false);
+       get_db_rel_and_slot_infos(&new_cluster);
 
        check_new_cluster_is_empty();
 
@@ -826,14 +826,14 @@ check_cluster_versions(void)
 
 
 void
-check_cluster_compatibility(bool live_check)
+check_cluster_compatibility(void)
 {
        /* get/check pg_control data of servers */
-       get_control_data(&old_cluster, live_check);
-       get_control_data(&new_cluster, false);
+       get_control_data(&old_cluster);
+       get_control_data(&new_cluster);
        check_control_data(&old_cluster.controldata, &new_cluster.controldata);
 
-       if (live_check && old_cluster.port == new_cluster.port)
+       if (user_opts.live_check && old_cluster.port == new_cluster.port)
                pg_fatal("When checking a live server, "
                                 "the old and new port numbers must be 
different.");
 }
@@ -1839,7 +1839,7 @@ check_new_cluster_subscription_configuration(void)
  * before shutdown.
  */
 static void
-check_old_cluster_for_valid_slots(bool live_check)
+check_old_cluster_for_valid_slots(void)
 {
        char            output_path[MAXPGPATH];
        FILE       *script = NULL;
@@ -1878,7 +1878,7 @@ check_old_cluster_for_valid_slots(bool live_check)
                         * Note: This can be satisfied only when the old 
cluster has been
                         * shut down, so we skip this for live checks.
                         */
-                       if (!live_check && !slot->caught_up)
+                       if (!user_opts.live_check && !slot->caught_up)
                        {
                                if (script == NULL &&
                                        (script = fopen_priv(output_path, "w")) 
== NULL)
diff --git a/src/bin/pg_upgrade/controldata.c b/src/bin/pg_upgrade/controldata.c
index 1f0ccea3ed..cf665b9dee 100644
--- a/src/bin/pg_upgrade/controldata.c
+++ b/src/bin/pg_upgrade/controldata.c
@@ -33,7 +33,7 @@
  * return valid xid data for a running server.
  */
 void
-get_control_data(ClusterInfo *cluster, bool live_check)
+get_control_data(ClusterInfo *cluster)
 {
        char            cmd[MAXPGPATH];
        char            bufin[MAX_STRING];
@@ -76,6 +76,7 @@ get_control_data(ClusterInfo *cluster, bool live_check)
        uint32          segno = 0;
        char       *resetwal_bin;
        int                     rc;
+       bool            live_check = (cluster == &old_cluster && 
user_opts.live_check);
 
        /*
         * Because we test the pg_resetwal output as strings, it has to be in
@@ -118,7 +119,7 @@ get_control_data(ClusterInfo *cluster, bool live_check)
        /*
         * Check for clean shutdown
         */
-       if (!live_check || cluster == &new_cluster)
+       if (!live_check)
        {
                /* only pg_controldata outputs the cluster state */
                snprintf(cmd, sizeof(cmd), "\"%s/pg_controldata\" \"%s\"",
diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c
index 95c22a7200..8f1777de59 100644
--- a/src/bin/pg_upgrade/info.c
+++ b/src/bin/pg_upgrade/info.c
@@ -27,7 +27,7 @@ static void free_rel_infos(RelInfoArr *rel_arr);
 static void print_db_infos(DbInfoArr *db_arr);
 static void print_rel_infos(RelInfoArr *rel_arr);
 static void print_slot_infos(LogicalSlotInfoArr *slot_arr);
-static void get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool 
live_check);
+static void get_old_cluster_logical_slot_infos(DbInfo *dbinfo);
 static void get_db_subscription_count(DbInfo *dbinfo);
 
 
@@ -273,11 +273,9 @@ report_unmatched_relation(const RelInfo *rel, const DbInfo 
*db, bool is_new_db)
  *
  * higher level routine to generate dbinfos for the database running
  * on the given "port". Assumes that server is already running.
- *
- * live_check would be used only when the target is the old cluster.
  */
 void
-get_db_rel_and_slot_infos(ClusterInfo *cluster, bool live_check)
+get_db_rel_and_slot_infos(ClusterInfo *cluster)
 {
        int                     dbnum;
 
@@ -299,7 +297,7 @@ get_db_rel_and_slot_infos(ClusterInfo *cluster, bool 
live_check)
                 */
                if (cluster == &old_cluster)
                {
-                       get_old_cluster_logical_slot_infos(pDbInfo, live_check);
+                       get_old_cluster_logical_slot_infos(pDbInfo);
                        get_db_subscription_count(pDbInfo);
                }
        }
@@ -645,7 +643,7 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
  * are included.
  */
 static void
-get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check)
+get_old_cluster_logical_slot_infos(DbInfo *dbinfo)
 {
        PGconn     *conn;
        PGresult   *res;
@@ -681,7 +679,7 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool 
live_check)
                                                        "WHERE slot_type = 
'logical' AND "
                                                        "database = 
current_database() AND "
                                                        "temporary IS FALSE;",
-                                                       live_check ? "FALSE" :
+                                                       user_opts.live_check ? 
"FALSE" :
                                                        "(CASE WHEN 
invalidation_reason IS NOT NULL THEN FALSE "
                                                        "ELSE (SELECT 
pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name)) "
                                                        "END)");
diff --git a/src/bin/pg_upgrade/option.c b/src/bin/pg_upgrade/option.c
index 548ea4e623..6f41d63eed 100644
--- a/src/bin/pg_upgrade/option.c
+++ b/src/bin/pg_upgrade/option.c
@@ -470,10 +470,10 @@ adjust_data_dir(ClusterInfo *cluster)
  * directory.
  */
 void
-get_sock_dir(ClusterInfo *cluster, bool live_check)
+get_sock_dir(ClusterInfo *cluster)
 {
 #if !defined(WIN32)
-       if (!live_check)
+       if (!user_opts.live_check || cluster == &new_cluster)
                cluster->sockdir = user_opts.socketdir;
        else
        {
diff --git a/src/bin/pg_upgrade/pg_upgrade.c b/src/bin/pg_upgrade/pg_upgrade.c
index 03eb738fd7..99f3d4543e 100644
--- a/src/bin/pg_upgrade/pg_upgrade.c
+++ b/src/bin/pg_upgrade/pg_upgrade.c
@@ -65,7 +65,7 @@ static void create_new_objects(void);
 static void copy_xact_xlog_xid(void);
 static void set_frozenxids(bool minmxid_only);
 static void make_outputdirs(char *pgdata);
-static void setup(char *argv0, bool *live_check);
+static void setup(char *argv0);
 static void create_logical_replication_slots(void);
 
 ClusterInfo old_cluster,
@@ -88,7 +88,6 @@ int
 main(int argc, char **argv)
 {
        char       *deletion_script_file_name = NULL;
-       bool            live_check = false;
 
        /*
         * pg_upgrade doesn't currently use common/logging.c, but initialize it
@@ -123,18 +122,18 @@ main(int argc, char **argv)
         */
        make_outputdirs(new_cluster.pgdata);
 
-       setup(argv[0], &live_check);
+       setup(argv[0]);
 
-       output_check_banner(live_check);
+       output_check_banner();
 
        check_cluster_versions();
 
-       get_sock_dir(&old_cluster, live_check);
-       get_sock_dir(&new_cluster, false);
+       get_sock_dir(&old_cluster);
+       get_sock_dir(&new_cluster);
 
-       check_cluster_compatibility(live_check);
+       check_cluster_compatibility();
 
-       check_and_dump_old_cluster(live_check);
+       check_and_dump_old_cluster();
 
 
        /* -- NEW -- */
@@ -331,7 +330,7 @@ make_outputdirs(char *pgdata)
 
 
 static void
-setup(char *argv0, bool *live_check)
+setup(char *argv0)
 {
        /*
         * make sure the user has a clean environment, otherwise, we may confuse
@@ -378,7 +377,7 @@ setup(char *argv0, bool *live_check)
                                pg_fatal("There seems to be a postmaster 
servicing the old cluster.\n"
                                                 "Please shutdown that 
postmaster and try again.");
                        else
-                               *live_check = true;
+                               user_opts.live_check = true;
                }
        }
 
@@ -660,7 +659,7 @@ create_new_objects(void)
                set_frozenxids(true);
 
        /* update new_cluster info now that we have objects in the databases */
-       get_db_rel_and_slot_infos(&new_cluster, false);
+       get_db_rel_and_slot_infos(&new_cluster);
 }
 
 /*
diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h
index 1ebad3bd74..56d05d7eb9 100644
--- a/src/bin/pg_upgrade/pg_upgrade.h
+++ b/src/bin/pg_upgrade/pg_upgrade.h
@@ -322,6 +322,7 @@ typedef struct
 typedef struct
 {
        bool            check;                  /* check clusters only, don't 
change any data */
+       bool            live_check;             /* check clusters only, old 
server is running */
        bool            do_sync;                /* flush changes to disk */
        transferMode transfer_mode; /* copy files or link them? */
        int                     jobs;                   /* number of 
processes/threads to use */
@@ -366,20 +367,20 @@ extern OSInfo os_info;
 
 /* check.c */
 
-void           output_check_banner(bool live_check);
-void           check_and_dump_old_cluster(bool live_check);
+void           output_check_banner(void);
+void           check_and_dump_old_cluster(void);
 void           check_new_cluster(void);
 void           report_clusters_compatible(void);
 void           issue_warnings_and_set_wal_level(void);
 void           output_completion_banner(char *deletion_script_file_name);
 void           check_cluster_versions(void);
-void           check_cluster_compatibility(bool live_check);
+void           check_cluster_compatibility(void);
 void           create_script_for_old_cluster_deletion(char 
**deletion_script_file_name);
 
 
 /* controldata.c */
 
-void           get_control_data(ClusterInfo *cluster, bool live_check);
+void           get_control_data(ClusterInfo *cluster);
 void           check_control_data(ControlData *oldctrl, ControlData *newctrl);
 void           disable_old_cluster(void);
 
@@ -428,7 +429,7 @@ void                check_loadable_libraries(void);
 FileNameMap *gen_db_file_maps(DbInfo *old_db,
                                                          DbInfo *new_db, int 
*nmaps, const char *old_pgdata,
                                                          const char 
*new_pgdata);
-void           get_db_rel_and_slot_infos(ClusterInfo *cluster, bool 
live_check);
+void           get_db_rel_and_slot_infos(ClusterInfo *cluster);
 int                    count_old_cluster_logical_slots(void);
 int                    count_old_cluster_subscriptions(void);
 
@@ -436,7 +437,7 @@ int                 count_old_cluster_subscriptions(void);
 
 void           parseCommandLine(int argc, char *argv[]);
 void           adjust_data_dir(ClusterInfo *cluster);
-void           get_sock_dir(ClusterInfo *cluster, bool live_check);
+void           get_sock_dir(ClusterInfo *cluster);
 
 /* relfilenumber.c */
 
-- 
2.39.3 (Apple Git-146)

>From 38a7219a1170359c23393fcb896bd446d5062dab Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Fri, 28 Jun 2024 21:09:33 -0500
Subject: [PATCH v4 04/12] use new pg_upgrade async API for retrieving relinfos

---
 src/bin/pg_upgrade/info.c | 187 +++++++++++++++++---------------------
 1 file changed, 81 insertions(+), 106 deletions(-)

diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c
index 8f1777de59..d07255bd0a 100644
--- a/src/bin/pg_upgrade/info.c
+++ b/src/bin/pg_upgrade/info.c
@@ -22,13 +22,16 @@ static void report_unmatched_relation(const RelInfo *rel, 
const DbInfo *db,
 static void free_db_and_rel_infos(DbInfoArr *db_arr);
 static void get_template0_info(ClusterInfo *cluster);
 static void get_db_infos(ClusterInfo *cluster);
-static void get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo);
+static char *get_rel_infos_query(DbInfo *dbinfo, void *arg);
+static void get_rel_infos_result(DbInfo *dbinfo, PGresult *res, void *arg);
 static void free_rel_infos(RelInfoArr *rel_arr);
 static void print_db_infos(DbInfoArr *db_arr);
 static void print_rel_infos(RelInfoArr *rel_arr);
 static void print_slot_infos(LogicalSlotInfoArr *slot_arr);
-static void get_old_cluster_logical_slot_infos(DbInfo *dbinfo);
-static void get_db_subscription_count(DbInfo *dbinfo);
+static char *get_old_cluster_logical_slot_infos_query(DbInfo *dbinfo, void 
*arg);
+static void get_old_cluster_logical_slot_infos_result(DbInfo *dbinfo, PGresult 
*res, void *arg);
+static char *get_db_subscription_count_query(DbInfo *dbinfo, void *arg);
+static void get_db_subscription_count_result(DbInfo *dbinfo, PGresult *res, 
void *arg);
 
 
 /*
@@ -277,7 +280,7 @@ report_unmatched_relation(const RelInfo *rel, const DbInfo 
*db, bool is_new_db)
 void
 get_db_rel_and_slot_infos(ClusterInfo *cluster)
 {
-       int                     dbnum;
+       AsyncTask  *task = async_task_create();
 
        if (cluster->dbarr.dbs != NULL)
                free_db_and_rel_infos(&cluster->dbarr);
@@ -285,23 +288,26 @@ get_db_rel_and_slot_infos(ClusterInfo *cluster)
        get_template0_info(cluster);
        get_db_infos(cluster);
 
-       for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
+       async_task_add_step(task,
+                                               get_rel_infos_query,
+                                               get_rel_infos_result,
+                                               true, NULL);
+       if (cluster == &old_cluster &&
+               GET_MAJOR_VERSION(cluster->major_version) > 1600)
        {
-               DbInfo     *pDbInfo = &cluster->dbarr.dbs[dbnum];
-
-               get_rel_infos(cluster, pDbInfo);
-
-               /*
-                * Retrieve the logical replication slots infos and the 
subscriptions
-                * count for the old cluster.
-                */
-               if (cluster == &old_cluster)
-               {
-                       get_old_cluster_logical_slot_infos(pDbInfo);
-                       get_db_subscription_count(pDbInfo);
-               }
+               async_task_add_step(task,
+                                                       
get_old_cluster_logical_slot_infos_query,
+                                                       
get_old_cluster_logical_slot_infos_result,
+                                                       true, cluster);
+               async_task_add_step(task,
+                                                       
get_db_subscription_count_query,
+                                                       
get_db_subscription_count_result,
+                                                       true, cluster);
        }
 
+       async_task_run(task, cluster);
+       async_task_free(task);
+
        if (cluster == &old_cluster)
                pg_log(PG_VERBOSE, "\nsource databases:");
        else
@@ -447,30 +453,10 @@ get_db_infos(ClusterInfo *cluster)
  * Note: the resulting RelInfo array is assumed to be sorted by OID.
  * This allows later processing to match up old and new databases efficiently.
  */
-static void
-get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
+static char *
+get_rel_infos_query(DbInfo *dbinfo, void *arg)
 {
-       PGconn     *conn = connectToServer(cluster,
-                                                                          
dbinfo->db_name);
-       PGresult   *res;
-       RelInfo    *relinfos;
-       int                     ntups;
-       int                     relnum;
-       int                     num_rels = 0;
-       char       *nspname = NULL;
-       char       *relname = NULL;
-       char       *tablespace = NULL;
-       int                     i_spclocation,
-                               i_nspname,
-                               i_relname,
-                               i_reloid,
-                               i_indtable,
-                               i_toastheap,
-                               i_relfilenumber,
-                               i_reltablespace;
-       char            query[QUERY_ALLOC];
-       char       *last_namespace = NULL,
-                          *last_tablespace = NULL;
+       char       *query = pg_malloc(QUERY_ALLOC);
 
        query[0] = '\0';                        /* initialize query string to 
empty */
 
@@ -484,7 +470,7 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
         * output, so we have to copy that system table.  It's easiest to do 
that
         * by treating it as a user table.
         */
-       snprintf(query + strlen(query), sizeof(query) - strlen(query),
+       snprintf(query + strlen(query), QUERY_ALLOC - strlen(query),
                         "WITH regular_heap (reloid, indtable, toastheap) AS ( "
                         "  SELECT c.oid, 0::oid, 0::oid "
                         "  FROM pg_catalog.pg_class c JOIN 
pg_catalog.pg_namespace n "
@@ -506,7 +492,7 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
         * selected by the regular_heap CTE.  (We have to do this separately
         * because the namespace-name rules above don't work for toast tables.)
         */
-       snprintf(query + strlen(query), sizeof(query) - strlen(query),
+       snprintf(query + strlen(query), QUERY_ALLOC - strlen(query),
                         "  toast_heap (reloid, indtable, toastheap) AS ( "
                         "  SELECT c.reltoastrelid, 0::oid, c.oid "
                         "  FROM regular_heap JOIN pg_catalog.pg_class c "
@@ -519,7 +505,7 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
         * Testing indisready is necessary in 9.2, and harmless in earlier/later
         * versions.
         */
-       snprintf(query + strlen(query), sizeof(query) - strlen(query),
+       snprintf(query + strlen(query), QUERY_ALLOC - strlen(query),
                         "  all_index (reloid, indtable, toastheap) AS ( "
                         "  SELECT indexrelid, indrelid, 0::oid "
                         "  FROM pg_catalog.pg_index "
@@ -533,7 +519,7 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
         * And now we can write the query that retrieves the data we want for 
each
         * heap and index relation.  Make sure result is sorted by OID.
         */
-       snprintf(query + strlen(query), sizeof(query) - strlen(query),
+       snprintf(query + strlen(query), QUERY_ALLOC - strlen(query),
                         "SELECT all_rels.*, n.nspname, c.relname, "
                         "  c.relfilenode, c.reltablespace, "
                         "  pg_catalog.pg_tablespace_location(t.oid) AS 
spclocation "
@@ -550,22 +536,30 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
                         "     ON c.reltablespace = t.oid "
                         "ORDER BY 1;");
 
-       res = executeQueryOrDie(conn, "%s", query);
-
-       ntups = PQntuples(res);
-
-       relinfos = (RelInfo *) pg_malloc(sizeof(RelInfo) * ntups);
+       return query;
+}
 
-       i_reloid = PQfnumber(res, "reloid");
-       i_indtable = PQfnumber(res, "indtable");
-       i_toastheap = PQfnumber(res, "toastheap");
-       i_nspname = PQfnumber(res, "nspname");
-       i_relname = PQfnumber(res, "relname");
-       i_relfilenumber = PQfnumber(res, "relfilenode");
-       i_reltablespace = PQfnumber(res, "reltablespace");
-       i_spclocation = PQfnumber(res, "spclocation");
+static void
+get_rel_infos_result(DbInfo *dbinfo, PGresult *res, void *arg)
+{
+       int                     ntups = PQntuples(res);
+       RelInfo    *relinfos = (RelInfo *) pg_malloc(sizeof(RelInfo) * ntups);
+       int                     i_reloid = PQfnumber(res, "reloid");
+       int                     i_indtable = PQfnumber(res, "indtable");
+       int                     i_toastheap = PQfnumber(res, "toastheap");
+       int                     i_nspname = PQfnumber(res, "nspname");
+       int                     i_relname = PQfnumber(res, "relname");
+       int                     i_relfilenumber = PQfnumber(res, "relfilenode");
+       int                     i_reltablespace = PQfnumber(res, 
"reltablespace");
+       int                     i_spclocation = PQfnumber(res, "spclocation");
+       int                     num_rels = 0;
+       char       *nspname = NULL;
+       char       *relname = NULL;
+       char       *tablespace = NULL;
+       char       *last_namespace = NULL;
+       char       *last_tablespace = NULL;
 
-       for (relnum = 0; relnum < ntups; relnum++)
+       for (int relnum = 0; relnum < ntups; relnum++)
        {
                RelInfo    *curr = &relinfos[num_rels++];
 
@@ -618,9 +612,6 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
                        /* A zero reltablespace oid indicates the database 
tablespace. */
                        curr->tablespace = dbinfo->db_tablespace;
        }
-       PQclear(res);
-
-       PQfinish(conn);
 
        dbinfo->rel_arr.rels = relinfos;
        dbinfo->rel_arr.nrels = num_rels;
@@ -642,20 +633,9 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
  * check_old_cluster_for_valid_slots() would raise a FATAL error if such slots
  * are included.
  */
-static void
-get_old_cluster_logical_slot_infos(DbInfo *dbinfo)
+static char *
+get_old_cluster_logical_slot_infos_query(DbInfo *dbinfo, void *arg)
 {
-       PGconn     *conn;
-       PGresult   *res;
-       LogicalSlotInfo *slotinfos = NULL;
-       int                     num_slots;
-
-       /* Logical slots can be migrated since PG17. */
-       if (GET_MAJOR_VERSION(old_cluster.major_version) <= 1600)
-               return;
-
-       conn = connectToServer(&old_cluster, dbinfo->db_name);
-
        /*
         * Fetch the logical replication slot information. The check whether the
         * slot is considered caught up is done by an upgrade function. This
@@ -673,18 +653,23 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo)
         * started and stopped several times causing any temporary slots to be
         * removed.
         */
-       res = executeQueryOrDie(conn, "SELECT slot_name, plugin, two_phase, 
failover, "
-                                                       "%s as caught_up, 
invalidation_reason IS NOT NULL as invalid "
-                                                       "FROM 
pg_catalog.pg_replication_slots "
-                                                       "WHERE slot_type = 
'logical' AND "
-                                                       "database = 
current_database() AND "
-                                                       "temporary IS FALSE;",
-                                                       user_opts.live_check ? 
"FALSE" :
-                                                       "(CASE WHEN 
invalidation_reason IS NOT NULL THEN FALSE "
-                                                       "ELSE (SELECT 
pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name)) "
-                                                       "END)");
-
-       num_slots = PQntuples(res);
+       return psprintf("SELECT slot_name, plugin, two_phase, failover, "
+                                       "%s as caught_up, invalidation_reason 
IS NOT NULL as invalid "
+                                       "FROM pg_catalog.pg_replication_slots "
+                                       "WHERE slot_type = 'logical' AND "
+                                       "database = current_database() AND "
+                                       "temporary IS FALSE;",
+                                       user_opts.live_check ? "FALSE" :
+                                       "(CASE WHEN invalidation_reason IS NOT 
NULL THEN FALSE "
+                                       "ELSE (SELECT 
pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name)) "
+                                       "END)");
+}
+
+static void
+get_old_cluster_logical_slot_infos_result(DbInfo *dbinfo, PGresult *res, void 
*arg)
+{
+       LogicalSlotInfo *slotinfos = NULL;
+       int                     num_slots = PQntuples(res);
 
        if (num_slots)
        {
@@ -717,14 +702,10 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo)
                }
        }
 
-       PQclear(res);
-       PQfinish(conn);
-
        dbinfo->slot_arr.slots = slotinfos;
        dbinfo->slot_arr.nslots = num_slots;
 }
 
-
 /*
  * count_old_cluster_logical_slots()
  *
@@ -754,24 +735,18 @@ count_old_cluster_logical_slots(void)
  * This is because before that the logical slots are not upgraded, so we will
  * not be able to upgrade the logical replication clusters completely.
  */
-static void
-get_db_subscription_count(DbInfo *dbinfo)
+static char *
+get_db_subscription_count_query(DbInfo *dbinfo, void *arg)
 {
-       PGconn     *conn;
-       PGresult   *res;
-
-       /* Subscriptions can be migrated since PG17. */
-       if (GET_MAJOR_VERSION(old_cluster.major_version) < 1700)
-               return;
+       return psprintf("SELECT count(*) "
+                                       "FROM pg_catalog.pg_subscription WHERE 
subdbid = %u",
+                                       dbinfo->db_oid);
+}
 
-       conn = connectToServer(&old_cluster, dbinfo->db_name);
-       res = executeQueryOrDie(conn, "SELECT count(*) "
-                                                       "FROM 
pg_catalog.pg_subscription WHERE subdbid = %u",
-                                                       dbinfo->db_oid);
+static void
+get_db_subscription_count_result(DbInfo *dbinfo, PGresult *res, void *arg)
+{
        dbinfo->nsubs = atoi(PQgetvalue(res, 0, 0));
-
-       PQclear(res);
-       PQfinish(conn);
 }
 
 /*
-- 
2.39.3 (Apple Git-146)

>From dbe59edcbe0d1b248456f67edc06cacfa5b06c69 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Fri, 28 Jun 2024 21:24:35 -0500
Subject: [PATCH v4 05/12] use new pg_upgrade async API to parallelize getting
 loadable libraries

---
 src/bin/pg_upgrade/function.c | 63 ++++++++++++++++++++---------------
 1 file changed, 37 insertions(+), 26 deletions(-)

diff --git a/src/bin/pg_upgrade/function.c b/src/bin/pg_upgrade/function.c
index 7e3abed098..c11fce0696 100644
--- a/src/bin/pg_upgrade/function.c
+++ b/src/bin/pg_upgrade/function.c
@@ -42,6 +42,32 @@ library_name_compare(const void *p1, const void *p2)
                                          ((const LibraryInfo *) p2)->dbnum);
 }
 
+struct loadable_libraries_state
+{
+       PGresult  **ress;
+       int                     totaltups;
+};
+
+static char *
+get_loadable_libraries_query(DbInfo *dbinfo, void *arg)
+{
+       return psprintf("SELECT DISTINCT probin "
+                                       "FROM pg_catalog.pg_proc "
+                                       "WHERE prolang = %u AND "
+                                       "probin IS NOT NULL AND "
+                                       "oid >= %u;",
+                                       ClanguageId,
+                                       FirstNormalObjectId);
+}
+
+static void
+get_loadable_libraries_result(DbInfo *dbinfo, PGresult *res, void *arg)
+{
+       struct loadable_libraries_state *state = (struct 
loadable_libraries_state *) arg;
+
+       state->ress[dbinfo - old_cluster.dbarr.dbs] = res;
+       state->totaltups += PQntuples(res);
+}
 
 /*
  * get_loadable_libraries()
@@ -54,47 +80,32 @@ library_name_compare(const void *p1, const void *p2)
 void
 get_loadable_libraries(void)
 {
-       PGresult  **ress;
        int                     totaltups;
        int                     dbnum;
        int                     n_libinfos;
+       AsyncTask  *task = async_task_create();
+       struct loadable_libraries_state state;
 
-       ress = (PGresult **) pg_malloc(old_cluster.dbarr.ndbs * sizeof(PGresult 
*));
-       totaltups = 0;
+       state.ress = (PGresult **) pg_malloc(old_cluster.dbarr.ndbs * 
sizeof(PGresult *));
+       state.totaltups = 0;
 
-       /* Fetch all library names, removing duplicates within each DB */
-       for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
-       {
-               DbInfo     *active_db = &old_cluster.dbarr.dbs[dbnum];
-               PGconn     *conn = connectToServer(&old_cluster, 
active_db->db_name);
+       async_task_add_step(task, get_loadable_libraries_query,
+                                               get_loadable_libraries_result, 
false, &state);
 
-               /*
-                * Fetch all libraries containing non-built-in C functions in 
this DB.
-                */
-               ress[dbnum] = executeQueryOrDie(conn,
-                                                                               
"SELECT DISTINCT probin "
-                                                                               
"FROM pg_catalog.pg_proc "
-                                                                               
"WHERE prolang = %u AND "
-                                                                               
"probin IS NOT NULL AND "
-                                                                               
"oid >= %u;",
-                                                                               
ClanguageId,
-                                                                               
FirstNormalObjectId);
-               totaltups += PQntuples(ress[dbnum]);
-
-               PQfinish(conn);
-       }
+       async_task_run(task, &old_cluster);
+       async_task_free(task);
 
        /*
         * Allocate memory for required libraries and logical replication output
         * plugins.
         */
-       n_libinfos = totaltups + count_old_cluster_logical_slots();
+       n_libinfos = state.totaltups + count_old_cluster_logical_slots();
        os_info.libraries = (LibraryInfo *) pg_malloc(sizeof(LibraryInfo) * 
n_libinfos);
        totaltups = 0;
 
        for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
        {
-               PGresult   *res = ress[dbnum];
+               PGresult   *res = state.ress[dbnum];
                int                     ntups;
                int                     rowno;
                LogicalSlotInfoArr *slot_arr = 
&old_cluster.dbarr.dbs[dbnum].slot_arr;
@@ -129,7 +140,7 @@ get_loadable_libraries(void)
                }
        }
 
-       pg_free(ress);
+       pg_free(state.ress);
 
        os_info.num_libraries = totaltups;
 }
-- 
2.39.3 (Apple Git-146)

>From e7a87eec5317057ad8558a2a450a8b8fa5cc007d Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Fri, 28 Jun 2024 21:31:57 -0500
Subject: [PATCH v4 06/12] use new pg_upgrade async API to parallelize
 reporting extension updates

---
 src/bin/pg_upgrade/version.c | 82 ++++++++++++++++++------------------
 1 file changed, 41 insertions(+), 41 deletions(-)

diff --git a/src/bin/pg_upgrade/version.c b/src/bin/pg_upgrade/version.c
index 2de6dffccd..12783bb2ba 100644
--- a/src/bin/pg_upgrade/version.c
+++ b/src/bin/pg_upgrade/version.c
@@ -139,6 +139,42 @@ old_9_6_invalidate_hash_indexes(ClusterInfo *cluster, bool 
check_mode)
                check_ok();
 }
 
+static char *
+report_extension_updates_query(DbInfo *dbinfo, void *arg)
+{
+       return pg_strdup("SELECT name "
+                                        "FROM pg_available_extensions "
+                                        "WHERE installed_version != 
default_version");
+}
+
+static void
+report_extension_updates_result(DbInfo *dbinfo, PGresult *res, void *arg)
+{
+       bool            db_used = false;
+       int                     ntups = PQntuples(res);
+       int                     i_name = PQfnumber(res, "name");
+       char       *output_path = "update_extensions.sql";
+       FILE      **script = (FILE **) arg;
+
+       for (int rowno = 0; rowno < ntups; rowno++)
+       {
+               if (*script == NULL && (*script = fopen_priv(output_path, "w")) 
== NULL)
+                       pg_fatal("could not open file \"%s\": %m", output_path);
+               if (!db_used)
+               {
+                       PQExpBufferData connectbuf;
+
+                       initPQExpBuffer(&connectbuf);
+                       appendPsqlMetaConnect(&connectbuf, dbinfo->db_name);
+                       fputs(connectbuf.data, *script);
+                       termPQExpBuffer(&connectbuf);
+                       db_used = true;
+               }
+               fprintf(*script, "ALTER EXTENSION %s UPDATE;\n",
+                               quote_identifier(PQgetvalue(res, rowno, 
i_name)));
+       }
+}
+
 /*
  * report_extension_updates()
  *     Report extensions that should be updated.
@@ -146,53 +182,17 @@ old_9_6_invalidate_hash_indexes(ClusterInfo *cluster, 
bool check_mode)
 void
 report_extension_updates(ClusterInfo *cluster)
 {
-       int                     dbnum;
        FILE       *script = NULL;
        char       *output_path = "update_extensions.sql";
+       AsyncTask  *task = async_task_create();
 
        prep_status("Checking for extension updates");
 
-       for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
-       {
-               PGresult   *res;
-               bool            db_used = false;
-               int                     ntups;
-               int                     rowno;
-               int                     i_name;
-               DbInfo     *active_db = &cluster->dbarr.dbs[dbnum];
-               PGconn     *conn = connectToServer(cluster, active_db->db_name);
-
-               /* find extensions needing updates */
-               res = executeQueryOrDie(conn,
-                                                               "SELECT name "
-                                                               "FROM 
pg_available_extensions "
-                                                               "WHERE 
installed_version != default_version"
-                       );
-
-               ntups = PQntuples(res);
-               i_name = PQfnumber(res, "name");
-               for (rowno = 0; rowno < ntups; rowno++)
-               {
-                       if (script == NULL && (script = fopen_priv(output_path, 
"w")) == NULL)
-                               pg_fatal("could not open file \"%s\": %m", 
output_path);
-                       if (!db_used)
-                       {
-                               PQExpBufferData connectbuf;
-
-                               initPQExpBuffer(&connectbuf);
-                               appendPsqlMetaConnect(&connectbuf, 
active_db->db_name);
-                               fputs(connectbuf.data, script);
-                               termPQExpBuffer(&connectbuf);
-                               db_used = true;
-                       }
-                       fprintf(script, "ALTER EXTENSION %s UPDATE;\n",
-                                       quote_identifier(PQgetvalue(res, rowno, 
i_name)));
-               }
+       async_task_add_step(task, report_extension_updates_query,
+                                               
report_extension_updates_result, true, &script);
 
-               PQclear(res);
-
-               PQfinish(conn);
-       }
+       async_task_run(task, cluster);
+       async_task_free(task);
 
        if (script)
        {
-- 
2.39.3 (Apple Git-146)

>From c6122d5b94375ebda9ebe7c2f78fb73b44780b78 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Sat, 6 Jul 2024 21:06:31 -0500
Subject: [PATCH v4 07/12] parallelize data type checks in pg_upgrade

---
 src/bin/pg_upgrade/check.c | 319 +++++++++++++++++++------------------
 1 file changed, 160 insertions(+), 159 deletions(-)

diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index 251f3d9017..800d944218 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -32,6 +32,10 @@ static void 
check_new_cluster_subscription_configuration(void);
 static void check_old_cluster_for_valid_slots(void);
 static void check_old_cluster_subscription_state(void);
 
+static bool *data_type_check_results;
+static bool data_type_check_failed;
+static PQExpBufferData data_type_check_report;
+
 /*
  * DataTypesUsageChecks - definitions of data type checks for the old cluster
  * in order to determine if an upgrade can be performed.  See the comment on
@@ -314,6 +318,129 @@ static DataTypesUsageChecks data_types_usage_checks[] =
        }
 };
 
+static char *
+data_type_check_query(DbInfo *dbinfo, void *arg)
+{
+       int                     i = (int) (intptr_t) arg;
+       DataTypesUsageChecks *check = &data_types_usage_checks[i];
+
+       return psprintf("WITH RECURSIVE oids AS ( "
+       /* start with the type(s) returned by base_query */
+                                       "       %s "
+                                       "       UNION ALL "
+                                       "       SELECT * FROM ( "
+       /* inner WITH because we can only reference the CTE once */
+                                       "               WITH x AS (SELECT oid 
FROM oids) "
+       /* domains on any type selected so far */
+                                       "                       SELECT t.oid 
FROM pg_catalog.pg_type t, x WHERE typbasetype = x.oid AND typtype = 'd' "
+                                       "                       UNION ALL "
+       /* arrays over any type selected so far */
+                                       "                       SELECT t.oid 
FROM pg_catalog.pg_type t, x WHERE typelem = x.oid AND typtype = 'b' "
+                                       "                       UNION ALL "
+       /* composite types containing any type selected so far */
+                                       "                       SELECT t.oid 
FROM pg_catalog.pg_type t, pg_catalog.pg_class c, pg_catalog.pg_attribute a, x "
+                                       "                       WHERE t.typtype 
= 'c' AND "
+                                       "                                 t.oid 
= c.reltype AND "
+                                       "                                 c.oid 
= a.attrelid AND "
+                                       "                                 NOT 
a.attisdropped AND "
+                                       "                                 
a.atttypid = x.oid "
+                                       "                       UNION ALL "
+       /* ranges containing any type selected so far */
+                                       "                       SELECT t.oid 
FROM pg_catalog.pg_type t, pg_catalog.pg_range r, x "
+                                       "                       WHERE t.typtype 
= 'r' AND r.rngtypid = t.oid AND r.rngsubtype = x.oid"
+                                       "       ) foo "
+                                       ") "
+       /* now look for stored columns of any such type */
+                                       "SELECT n.nspname, c.relname, a.attname 
"
+                                       "FROM   pg_catalog.pg_class c, "
+                                       "               pg_catalog.pg_namespace 
n, "
+                                       "               pg_catalog.pg_attribute 
a "
+                                       "WHERE  c.oid = a.attrelid AND "
+                                       "               NOT a.attisdropped AND "
+                                       "               a.atttypid IN (SELECT 
oid FROM oids) AND "
+                                       "               c.relkind IN ("
+                                       CppAsString2(RELKIND_RELATION) ", "
+                                       CppAsString2(RELKIND_MATVIEW) ", "
+                                       CppAsString2(RELKIND_INDEX) ") AND "
+                                       "               c.relnamespace = n.oid 
AND "
+       /* exclude possible orphaned temp tables */
+                                       "               n.nspname !~ 
'^pg_temp_' AND "
+                                       "               n.nspname !~ 
'^pg_toast_temp_' AND "
+       /* exclude system catalogs, too */
+                                       "               n.nspname NOT IN 
('pg_catalog', 'information_schema')",
+                                       check->base_query);
+}
+
+static void
+data_type_check_process(DbInfo *dbinfo, PGresult *res, void *arg)
+{
+       int                     i = (int) (intptr_t) arg;
+       DataTypesUsageChecks *check = &data_types_usage_checks[i];
+       int                     ntups = PQntuples(res);
+
+       if (ntups)
+       {
+               char            output_path[MAXPGPATH];
+               int                     i_nspname;
+               int                     i_relname;
+               int                     i_attname;
+               FILE       *script = NULL;
+               bool            db_used = false;
+
+               snprintf(output_path, sizeof(output_path), "%s/%s",
+                                log_opts.basedir,
+                                check->report_filename);
+
+               /*
+                * Make sure we have a buffer to save reports to now that we 
found a
+                * first failing check.
+                */
+               if (!data_type_check_failed)
+                       initPQExpBuffer(&data_type_check_report);
+               data_type_check_failed = true;
+
+               /*
+                * If this is the first time we see an error for the check in 
question
+                * then print a status message of the failure.
+                */
+               if (!data_type_check_results[i])
+               {
+                       pg_log(PG_REPORT, "    failed check: %s", 
_(check->status));
+                       appendPQExpBuffer(&data_type_check_report, "\n%s\n%s    
%s\n",
+                                                         _(check->report_text),
+                                                         _("A list of the 
problem columns is in the file:"),
+                                                         output_path);
+               }
+               data_type_check_results[i] = true;
+
+               i_nspname = PQfnumber(res, "nspname");
+               i_relname = PQfnumber(res, "relname");
+               i_attname = PQfnumber(res, "attname");
+
+               for (int rowno = 0; rowno < ntups; rowno++)
+               {
+                       if (script == NULL && (script = fopen_priv(output_path, 
"a")) == NULL)
+                               pg_fatal("could not open file \"%s\": %m", 
output_path);
+
+                       if (!db_used)
+                       {
+                               fprintf(script, "In database: %s\n", 
dbinfo->db_name);
+                               db_used = true;
+                       }
+                       fprintf(script, "  %s.%s.%s\n",
+                                       PQgetvalue(res, rowno, i_nspname),
+                                       PQgetvalue(res, rowno, i_relname),
+                                       PQgetvalue(res, rowno, i_attname));
+               }
+
+               if (script)
+               {
+                       fclose(script);
+                       script = NULL;
+               }
+       }
+}
+
 /*
  * check_for_data_types_usage()
  *     Detect whether there are any stored columns depending on given type(s)
@@ -337,10 +464,9 @@ static void
 check_for_data_types_usage(ClusterInfo *cluster, DataTypesUsageChecks *checks)
 {
        bool            found = false;
-       bool       *results;
-       PQExpBufferData report;
        DataTypesUsageChecks *tmp = checks;
        int                     n_data_types_usage_checks = 0;
+       AsyncTask  *task = async_task_create();
 
        prep_status("Checking for data type usage");
 
@@ -352,176 +478,51 @@ check_for_data_types_usage(ClusterInfo *cluster, 
DataTypesUsageChecks *checks)
        }
 
        /* Prepare an array to store the results of checks in */
-       results = pg_malloc0(sizeof(bool) * n_data_types_usage_checks);
+       data_type_check_results = pg_malloc0(sizeof(bool) * 
n_data_types_usage_checks);
+       data_type_check_failed = false;
 
-       /*
-        * Connect to each database in the cluster and run all defined checks
-        * against that database before trying the next one.
-        */
-       for (int dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
+       for (int i = 0; i < n_data_types_usage_checks; i++)
        {
-               DbInfo     *active_db = &cluster->dbarr.dbs[dbnum];
-               PGconn     *conn = connectToServer(cluster, active_db->db_name);
+               DataTypesUsageChecks *check = &checks[i];
 
-               for (int checknum = 0; checknum < n_data_types_usage_checks; 
checknum++)
+               if (check->threshold_version == MANUAL_CHECK)
                {
-                       PGresult   *res;
-                       int                     ntups;
-                       int                     i_nspname;
-                       int                     i_relname;
-                       int                     i_attname;
-                       FILE       *script = NULL;
-                       bool            db_used = false;
-                       char            output_path[MAXPGPATH];
-                       DataTypesUsageChecks *cur_check = &checks[checknum];
-
-                       if (cur_check->threshold_version == MANUAL_CHECK)
-                       {
-                               Assert(cur_check->version_hook);
-
-                               /*
-                                * Make sure that the check applies to the 
current cluster
-                                * version and skip if not. If no check hook 
has been defined
-                                * we run the check for all versions.
-                                */
-                               if (!cur_check->version_hook(cluster))
-                                       continue;
-                       }
-                       else if (cur_check->threshold_version != ALL_VERSIONS)
-                       {
-                               if (GET_MAJOR_VERSION(cluster->major_version) > 
cur_check->threshold_version)
-                                       continue;
-                       }
-                       else
-                               Assert(cur_check->threshold_version == 
ALL_VERSIONS);
-
-                       snprintf(output_path, sizeof(output_path), "%s/%s",
-                                        log_opts.basedir,
-                                        cur_check->report_filename);
-
-                       /*
-                        * The type(s) of interest might be wrapped in a 
domain, array,
-                        * composite, or range, and these container types can 
be nested
-                        * (to varying extents depending on server version, but 
that's not
-                        * of concern here).  To handle all these cases we need 
a
-                        * recursive CTE.
-                        */
-                       res = executeQueryOrDie(conn,
-                                                                       "WITH 
RECURSIVE oids AS ( "
-                       /* start with the type(s) returned by base_query */
-                                                                       "       
%s "
-                                                                       "       
UNION ALL "
-                                                                       "       
SELECT * FROM ( "
-                       /* inner WITH because we can only reference the CTE 
once */
-                                                                       "       
        WITH x AS (SELECT oid FROM oids) "
-                       /* domains on any type selected so far */
-                                                                       "       
                SELECT t.oid FROM pg_catalog.pg_type t, x WHERE typbasetype = 
x.oid AND typtype = 'd' "
-                                                                       "       
                UNION ALL "
-                       /* arrays over any type selected so far */
-                                                                       "       
                SELECT t.oid FROM pg_catalog.pg_type t, x WHERE typelem = x.oid 
AND typtype = 'b' "
-                                                                       "       
                UNION ALL "
-                       /* composite types containing any type selected so far 
*/
-                                                                       "       
                SELECT t.oid FROM pg_catalog.pg_type t, pg_catalog.pg_class c, 
pg_catalog.pg_attribute a, x "
-                                                                       "       
                WHERE t.typtype = 'c' AND "
-                                                                       "       
                          t.oid = c.reltype AND "
-                                                                       "       
                          c.oid = a.attrelid AND "
-                                                                       "       
                          NOT a.attisdropped AND "
-                                                                       "       
                          a.atttypid = x.oid "
-                                                                       "       
                UNION ALL "
-                       /* ranges containing any type selected so far */
-                                                                       "       
                SELECT t.oid FROM pg_catalog.pg_type t, pg_catalog.pg_range r, 
x "
-                                                                       "       
                WHERE t.typtype = 'r' AND r.rngtypid = t.oid AND r.rngsubtype = 
x.oid"
-                                                                       "       
) foo "
-                                                                       ") "
-                       /* now look for stored columns of any such type */
-                                                                       "SELECT 
n.nspname, c.relname, a.attname "
-                                                                       "FROM   
pg_catalog.pg_class c, "
-                                                                       "       
        pg_catalog.pg_namespace n, "
-                                                                       "       
        pg_catalog.pg_attribute a "
-                                                                       "WHERE  
c.oid = a.attrelid AND "
-                                                                       "       
        NOT a.attisdropped AND "
-                                                                       "       
        a.atttypid IN (SELECT oid FROM oids) AND "
-                                                                       "       
        c.relkind IN ("
-                                                                       
CppAsString2(RELKIND_RELATION) ", "
-                                                                       
CppAsString2(RELKIND_MATVIEW) ", "
-                                                                       
CppAsString2(RELKIND_INDEX) ") AND "
-                                                                       "       
        c.relnamespace = n.oid AND "
-                       /* exclude possible orphaned temp tables */
-                                                                       "       
        n.nspname !~ '^pg_temp_' AND "
-                                                                       "       
        n.nspname !~ '^pg_toast_temp_' AND "
-                       /* exclude system catalogs, too */
-                                                                       "       
        n.nspname NOT IN ('pg_catalog', 'information_schema')",
-                                                                       
cur_check->base_query);
-
-                       ntups = PQntuples(res);
+                       Assert(check->version_hook);
 
                        /*
-                        * The datatype was found, so extract the data and log 
to the
-                        * requested filename. We need to open the file for 
appending
-                        * since the check might have already found the type in 
another
-                        * database earlier in the loop.
+                        * Make sure that the check applies to the current 
cluster version
+                        * and skip it if not.
                         */
-                       if (ntups)
-                       {
-                               /*
-                                * Make sure we have a buffer to save reports 
to now that we
-                                * found a first failing check.
-                                */
-                               if (!found)
-                                       initPQExpBuffer(&report);
-                               found = true;
-
-                               /*
-                                * If this is the first time we see an error 
for the check in
-                                * question then print a status message of the 
failure.
-                                */
-                               if (!results[checknum])
-                               {
-                                       pg_log(PG_REPORT, "    failed check: 
%s", _(cur_check->status));
-                                       appendPQExpBuffer(&report, "\n%s\n%s    
%s\n",
-                                                                         
_(cur_check->report_text),
-                                                                         _("A 
list of the problem columns is in the file:"),
-                                                                         
output_path);
-                               }
-                               results[checknum] = true;
-
-                               i_nspname = PQfnumber(res, "nspname");
-                               i_relname = PQfnumber(res, "relname");
-                               i_attname = PQfnumber(res, "attname");
-
-                               for (int rowno = 0; rowno < ntups; rowno++)
-                               {
-                                       if (script == NULL && (script = 
fopen_priv(output_path, "a")) == NULL)
-                                               pg_fatal("could not open file 
\"%s\": %m", output_path);
-
-                                       if (!db_used)
-                                       {
-                                               fprintf(script, "In database: 
%s\n", active_db->db_name);
-                                               db_used = true;
-                                       }
-                                       fprintf(script, "  %s.%s.%s\n",
-                                                       PQgetvalue(res, rowno, 
i_nspname),
-                                                       PQgetvalue(res, rowno, 
i_relname),
-                                                       PQgetvalue(res, rowno, 
i_attname));
-                               }
-
-                               if (script)
-                               {
-                                       fclose(script);
-                                       script = NULL;
-                               }
-                       }
-
-                       PQclear(res);
+                       if (!check->version_hook(cluster))
+                               continue;
                }
+               else if (check->threshold_version != ALL_VERSIONS)
+               {
+                       if (GET_MAJOR_VERSION(cluster->major_version) > 
check->threshold_version)
+                               continue;
+               }
+               else
+                       Assert(check->threshold_version == ALL_VERSIONS);
 
-               PQfinish(conn);
+               async_task_add_step(task, data_type_check_query,
+                                                       
data_type_check_process, true,
+                                                       (void *) (intptr_t) i);
        }
 
+       /*
+        * Connect to each database in the cluster and run all defined checks
+        * against that database before trying the next one.
+        */
+       async_task_run(task, cluster);
+       async_task_free(task);
+
        if (found)
-               pg_fatal("Data type checks failed: %s", report.data);
+       {
+               pg_fatal("Data type checks failed: %s", 
data_type_check_report.data);
+               termPQExpBuffer(&data_type_check_report);
+       }
 
-       pg_free(results);
+       pg_free(data_type_check_results);
 
        check_ok();
 }
-- 
2.39.3 (Apple Git-146)

>From 329b10464dcd377cc56a8eeb317a9c8e01ec8620 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Mon, 8 Jul 2024 21:00:20 -0500
Subject: [PATCH v4 08/12] parallelize isn and int8 passing mismatch check in
 pg_upgrade

---
 src/bin/pg_upgrade/check.c | 85 ++++++++++++++++++++------------------
 1 file changed, 44 insertions(+), 41 deletions(-)

diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index 800d944218..beb3fc9433 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -1193,6 +1193,44 @@ check_for_prepared_transactions(ClusterInfo *cluster)
        check_ok();
 }
 
+static char *
+isn_and_int8_passing_mismatch_query(DbInfo *dbinfo, void *arg)
+{
+       return pg_strdup("SELECT n.nspname, p.proname "
+                                        "FROM   pg_catalog.pg_proc p, "
+                                        "       pg_catalog.pg_namespace n "
+                                        "WHERE  p.pronamespace = n.oid AND "
+                                        "       p.probin = '$libdir/isn'");
+}
+
+static void
+isn_and_int8_passing_mismatch_process(DbInfo *dbinfo, PGresult *res, void *arg)
+{
+       bool            db_used = false;
+       int                     ntups = PQntuples(res);
+       int                     i_nspname = PQfnumber(res, "nspname");
+       int                     i_proname = PQfnumber(res, "proname");
+       FILE      **script = (FILE **) arg;
+       char            output_path[MAXPGPATH];
+
+       snprintf(output_path, sizeof(output_path), "%s/%s",
+                        log_opts.basedir,
+                        "contrib_isn_and_int8_pass_by_value.txt");
+
+       for (int rowno = 0; rowno < ntups; rowno++)
+       {
+               if (*script == NULL && (*script = fopen_priv(output_path, "w")) 
== NULL)
+                       pg_fatal("could not open file \"%s\": %m", output_path);
+               if (!db_used)
+               {
+                       fprintf(*script, "In database: %s\n", dbinfo->db_name);
+                       db_used = true;
+               }
+               fprintf(*script, "  %s.%s\n",
+                               PQgetvalue(res, rowno, i_nspname),
+                               PQgetvalue(res, rowno, i_proname));
+       }
+}
 
 /*
  *     check_for_isn_and_int8_passing_mismatch()
@@ -1204,8 +1242,8 @@ check_for_prepared_transactions(ClusterInfo *cluster)
 static void
 check_for_isn_and_int8_passing_mismatch(ClusterInfo *cluster)
 {
-       int                     dbnum;
        FILE       *script = NULL;
+       AsyncTask  *task;
        char            output_path[MAXPGPATH];
 
        prep_status("Checking for contrib/isn with bigint-passing mismatch");
@@ -1222,46 +1260,11 @@ check_for_isn_and_int8_passing_mismatch(ClusterInfo 
*cluster)
                         log_opts.basedir,
                         "contrib_isn_and_int8_pass_by_value.txt");
 
-       for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
-       {
-               PGresult   *res;
-               bool            db_used = false;
-               int                     ntups;
-               int                     rowno;
-               int                     i_nspname,
-                                       i_proname;
-               DbInfo     *active_db = &cluster->dbarr.dbs[dbnum];
-               PGconn     *conn = connectToServer(cluster, active_db->db_name);
-
-               /* Find any functions coming from contrib/isn */
-               res = executeQueryOrDie(conn,
-                                                               "SELECT 
n.nspname, p.proname "
-                                                               "FROM   
pg_catalog.pg_proc p, "
-                                                               "               
pg_catalog.pg_namespace n "
-                                                               "WHERE  
p.pronamespace = n.oid AND "
-                                                               "               
p.probin = '$libdir/isn'");
-
-               ntups = PQntuples(res);
-               i_nspname = PQfnumber(res, "nspname");
-               i_proname = PQfnumber(res, "proname");
-               for (rowno = 0; rowno < ntups; rowno++)
-               {
-                       if (script == NULL && (script = fopen_priv(output_path, 
"w")) == NULL)
-                               pg_fatal("could not open file \"%s\": %m", 
output_path);
-                       if (!db_used)
-                       {
-                               fprintf(script, "In database: %s\n", 
active_db->db_name);
-                               db_used = true;
-                       }
-                       fprintf(script, "  %s.%s\n",
-                                       PQgetvalue(res, rowno, i_nspname),
-                                       PQgetvalue(res, rowno, i_proname));
-               }
-
-               PQclear(res);
-
-               PQfinish(conn);
-       }
+       task = async_task_create();
+       async_task_add_step(task, isn_and_int8_passing_mismatch_query,
+                                               
isn_and_int8_passing_mismatch_process, true, &script);
+       async_task_run(task, cluster);
+       async_task_free(task);
 
        if (script)
        {
-- 
2.39.3 (Apple Git-146)

>From 102a1146b421dd0218d61586d2b4b65e5dc4cd86 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Mon, 8 Jul 2024 21:12:49 -0500
Subject: [PATCH v4 09/12] parallelize user defined postfix ops check in
 pg_upgrade

---
 src/bin/pg_upgrade/check.c | 134 +++++++++++++++++++------------------
 1 file changed, 69 insertions(+), 65 deletions(-)

diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index beb3fc9433..6b61db65c5 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -1282,15 +1282,79 @@ check_for_isn_and_int8_passing_mismatch(ClusterInfo 
*cluster)
                check_ok();
 }
 
+static char *
+user_defined_postfix_ops_query(DbInfo *dbinfo, void *arg)
+{
+       /*
+        * The query below hardcodes FirstNormalObjectId as 16384 rather than
+        * interpolating that C #define into the query because, if that #define 
is
+        * ever changed, the cutoff we want to use is the value used by
+        * pre-version 14 servers, not that of some future version.
+        */
+       return pg_strdup("SELECT o.oid AS oproid, "
+                                        "       n.nspname AS oprnsp, "
+                                        "       o.oprname, "
+                                        "       tn.nspname AS typnsp, "
+                                        "       t.typname "
+                                        "FROM pg_catalog.pg_operator o, "
+                                        "     pg_catalog.pg_namespace n, "
+                                        "     pg_catalog.pg_type t, "
+                                        "     pg_catalog.pg_namespace tn "
+                                        "WHERE o.oprnamespace = n.oid AND "
+                                        "      o.oprleft = t.oid AND "
+                                        "      t.typnamespace = tn.oid AND "
+                                        "      o.oprright = 0 AND "
+                                        "      o.oid >= 16384");
+}
+
+static void
+user_defined_postfix_ops_process(DbInfo *dbinfo, PGresult *res, void *arg)
+{
+       FILE      **script = (FILE **) arg;
+       char            output_path[MAXPGPATH];
+       int                     ntups = PQntuples(res);
+       bool            db_used = false;
+       int                     i_oproid = PQfnumber(res, "oproid");
+       int                     i_oprnsp = PQfnumber(res, "oprnsp");
+       int                     i_oprname = PQfnumber(res, "oprname");
+       int                     i_typnsp = PQfnumber(res, "typnsp");
+       int                     i_typname = PQfnumber(res, "typname");
+
+       if (!ntups)
+               return;
+
+       snprintf(output_path, sizeof(output_path), "%s/%s",
+                        log_opts.basedir,
+                        "postfix_ops.txt");
+
+       for (int rowno = 0; rowno < ntups; rowno++)
+       {
+               if (*script == NULL &&
+                       (*script = fopen_priv(output_path, "w")) == NULL)
+                       pg_fatal("could not open file \"%s\": %m", output_path);
+               if (!db_used)
+               {
+                       fprintf(*script, "In database: %s\n", dbinfo->db_name);
+                       db_used = true;
+               }
+               fprintf(*script, "  (oid=%s) %s.%s (%s.%s, NONE)\n",
+                               PQgetvalue(res, rowno, i_oproid),
+                               PQgetvalue(res, rowno, i_oprnsp),
+                               PQgetvalue(res, rowno, i_oprname),
+                               PQgetvalue(res, rowno, i_typnsp),
+                               PQgetvalue(res, rowno, i_typname));
+       }
+}
+
 /*
  * Verify that no user defined postfix operators exist.
  */
 static void
 check_for_user_defined_postfix_ops(ClusterInfo *cluster)
 {
-       int                     dbnum;
        FILE       *script = NULL;
        char            output_path[MAXPGPATH];
+       AsyncTask  *task = async_task_create();
 
        prep_status("Checking for user-defined postfix operators");
 
@@ -1298,70 +1362,10 @@ check_for_user_defined_postfix_ops(ClusterInfo *cluster)
                         log_opts.basedir,
                         "postfix_ops.txt");
 
-       /* Find any user defined postfix operators */
-       for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
-       {
-               PGresult   *res;
-               bool            db_used = false;
-               int                     ntups;
-               int                     rowno;
-               int                     i_oproid,
-                                       i_oprnsp,
-                                       i_oprname,
-                                       i_typnsp,
-                                       i_typname;
-               DbInfo     *active_db = &cluster->dbarr.dbs[dbnum];
-               PGconn     *conn = connectToServer(cluster, active_db->db_name);
-
-               /*
-                * The query below hardcodes FirstNormalObjectId as 16384 
rather than
-                * interpolating that C #define into the query because, if that
-                * #define is ever changed, the cutoff we want to use is the 
value
-                * used by pre-version 14 servers, not that of some future 
version.
-                */
-               res = executeQueryOrDie(conn,
-                                                               "SELECT o.oid 
AS oproid, "
-                                                               "       
n.nspname AS oprnsp, "
-                                                               "       
o.oprname, "
-                                                               "       
tn.nspname AS typnsp, "
-                                                               "       
t.typname "
-                                                               "FROM 
pg_catalog.pg_operator o, "
-                                                               "     
pg_catalog.pg_namespace n, "
-                                                               "     
pg_catalog.pg_type t, "
-                                                               "     
pg_catalog.pg_namespace tn "
-                                                               "WHERE 
o.oprnamespace = n.oid AND "
-                                                               "      
o.oprleft = t.oid AND "
-                                                               "      
t.typnamespace = tn.oid AND "
-                                                               "      
o.oprright = 0 AND "
-                                                               "      o.oid >= 
16384");
-               ntups = PQntuples(res);
-               i_oproid = PQfnumber(res, "oproid");
-               i_oprnsp = PQfnumber(res, "oprnsp");
-               i_oprname = PQfnumber(res, "oprname");
-               i_typnsp = PQfnumber(res, "typnsp");
-               i_typname = PQfnumber(res, "typname");
-               for (rowno = 0; rowno < ntups; rowno++)
-               {
-                       if (script == NULL &&
-                               (script = fopen_priv(output_path, "w")) == NULL)
-                               pg_fatal("could not open file \"%s\": %m", 
output_path);
-                       if (!db_used)
-                       {
-                               fprintf(script, "In database: %s\n", 
active_db->db_name);
-                               db_used = true;
-                       }
-                       fprintf(script, "  (oid=%s) %s.%s (%s.%s, NONE)\n",
-                                       PQgetvalue(res, rowno, i_oproid),
-                                       PQgetvalue(res, rowno, i_oprnsp),
-                                       PQgetvalue(res, rowno, i_oprname),
-                                       PQgetvalue(res, rowno, i_typnsp),
-                                       PQgetvalue(res, rowno, i_typname));
-               }
-
-               PQclear(res);
-
-               PQfinish(conn);
-       }
+       async_task_add_step(task, user_defined_postfix_ops_query,
+                                               
user_defined_postfix_ops_process, true, &script);
+       async_task_run(task, cluster);
+       async_task_free(task);
 
        if (script)
        {
-- 
2.39.3 (Apple Git-146)

>From 679bf95bea485149173543b6a89b0bf98209b40a Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Mon, 8 Jul 2024 21:30:19 -0500
Subject: [PATCH v4 10/12] parallelize incompatible polymorphics check in
 pg_upgrade

---
 src/bin/pg_upgrade/check.c       | 174 ++++++++++++++++---------------
 src/tools/pgindent/typedefs.list |   1 +
 2 files changed, 92 insertions(+), 83 deletions(-)

diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index 6b61db65c5..6c11732e70 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -1381,6 +1381,81 @@ check_for_user_defined_postfix_ops(ClusterInfo *cluster)
                check_ok();
 }
 
+typedef struct incompat_polymorphics_state
+{
+       FILE       *script;
+       PQExpBufferData old_polymorphics;
+       char            output_path[MAXPGPATH];
+} incompat_polymorphics_state;
+
+static char *
+incompat_polymorphics_query(DbInfo *dbinfo, void *arg)
+{
+       incompat_polymorphics_state *state = (incompat_polymorphics_state *) 
arg;
+
+       /*
+        * The query below hardcodes FirstNormalObjectId as 16384 rather than
+        * interpolating that C #define into the query because, if that #define 
is
+        * ever changed, the cutoff we want to use is the value used by
+        * pre-version 14 servers, not that of some future version.
+        */
+       /* Aggregate transition functions */
+       return psprintf("SELECT 'aggregate' AS objkind, 
p.oid::regprocedure::text AS objname "
+                                       "FROM pg_proc AS p "
+                                       "JOIN pg_aggregate AS a ON 
a.aggfnoid=p.oid "
+                                       "JOIN pg_proc AS transfn ON 
transfn.oid=a.aggtransfn "
+                                       "WHERE p.oid >= 16384 "
+                                       "AND a.aggtransfn = 
ANY(ARRAY[%s]::regprocedure[]) "
+                                       "AND a.aggtranstype = 
ANY(ARRAY['anyarray', 'anyelement']::regtype[]) "
+
+       /* Aggregate final functions */
+                                       "UNION ALL "
+                                       "SELECT 'aggregate' AS objkind, 
p.oid::regprocedure::text AS objname "
+                                       "FROM pg_proc AS p "
+                                       "JOIN pg_aggregate AS a ON 
a.aggfnoid=p.oid "
+                                       "JOIN pg_proc AS finalfn ON 
finalfn.oid=a.aggfinalfn "
+                                       "WHERE p.oid >= 16384 "
+                                       "AND a.aggfinalfn = 
ANY(ARRAY[%s]::regprocedure[]) "
+                                       "AND a.aggtranstype = 
ANY(ARRAY['anyarray', 'anyelement']::regtype[]) "
+
+       /* Operators */
+                                       "UNION ALL "
+                                       "SELECT 'operator' AS objkind, 
op.oid::regoperator::text AS objname "
+                                       "FROM pg_operator AS op "
+                                       "WHERE op.oid >= 16384 "
+                                       "AND oprcode = 
ANY(ARRAY[%s]::regprocedure[]) "
+                                       "AND oprleft = ANY(ARRAY['anyarray', 
'anyelement']::regtype[]);",
+                                       state->old_polymorphics.data,
+                                       state->old_polymorphics.data,
+                                       state->old_polymorphics.data);
+}
+
+static void
+incompat_polymorphics_process(DbInfo *dbinfo, PGresult *res, void *arg)
+{
+       incompat_polymorphics_state *state = (incompat_polymorphics_state *) 
arg;
+       bool            db_used = false;
+       int                     ntups = PQntuples(res);
+       int                     i_objkind = PQfnumber(res, "objkind");
+       int                     i_objname = PQfnumber(res, "objname");
+
+       for (int rowno = 0; rowno < ntups; rowno++)
+       {
+               if (state->script == NULL &&
+                       (state->script = fopen_priv(state->output_path, "w")) 
== NULL)
+                       pg_fatal("could not open file \"%s\": %m", 
state->output_path);
+               if (!db_used)
+               {
+                       fprintf(state->script, "In database: %s\n", 
dbinfo->db_name);
+                       db_used = true;
+               }
+
+               fprintf(state->script, "  %s: %s\n",
+                               PQgetvalue(res, rowno, i_objkind),
+                               PQgetvalue(res, rowno, i_objname));
+       }
+}
+
 /*
  *     check_for_incompatible_polymorphics()
  *
@@ -1390,111 +1465,44 @@ check_for_user_defined_postfix_ops(ClusterInfo 
*cluster)
 static void
 check_for_incompatible_polymorphics(ClusterInfo *cluster)
 {
-       PGresult   *res;
-       FILE       *script = NULL;
-       char            output_path[MAXPGPATH];
-       PQExpBufferData old_polymorphics;
+       AsyncTask  *task = async_task_create();
+       incompat_polymorphics_state state;
 
        prep_status("Checking for incompatible polymorphic functions");
 
-       snprintf(output_path, sizeof(output_path), "%s/%s",
+       state.script = NULL;
+       snprintf(state.output_path, sizeof(state.output_path), "%s/%s",
                         log_opts.basedir,
                         "incompatible_polymorphics.txt");
 
        /* The set of problematic functions varies a bit in different versions 
*/
-       initPQExpBuffer(&old_polymorphics);
+       initPQExpBuffer(&state.old_polymorphics);
 
-       appendPQExpBufferStr(&old_polymorphics,
+       appendPQExpBufferStr(&state.old_polymorphics,
                                                 
"'array_append(anyarray,anyelement)'"
                                                 ", 
'array_cat(anyarray,anyarray)'"
                                                 ", 
'array_prepend(anyelement,anyarray)'");
 
        if (GET_MAJOR_VERSION(cluster->major_version) >= 903)
-               appendPQExpBufferStr(&old_polymorphics,
+               appendPQExpBufferStr(&state.old_polymorphics,
                                                         ", 
'array_remove(anyarray,anyelement)'"
                                                         ", 
'array_replace(anyarray,anyelement,anyelement)'");
 
        if (GET_MAJOR_VERSION(cluster->major_version) >= 905)
-               appendPQExpBufferStr(&old_polymorphics,
+               appendPQExpBufferStr(&state.old_polymorphics,
                                                         ", 
'array_position(anyarray,anyelement)'"
                                                         ", 
'array_position(anyarray,anyelement,integer)'"
                                                         ", 
'array_positions(anyarray,anyelement)'"
                                                         ", 
'width_bucket(anyelement,anyarray)'");
 
-       for (int dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
-       {
-               bool            db_used = false;
-               DbInfo     *active_db = &cluster->dbarr.dbs[dbnum];
-               PGconn     *conn = connectToServer(cluster, active_db->db_name);
-               int                     ntups;
-               int                     i_objkind,
-                                       i_objname;
-
-               /*
-                * The query below hardcodes FirstNormalObjectId as 16384 
rather than
-                * interpolating that C #define into the query because, if that
-                * #define is ever changed, the cutoff we want to use is the 
value
-                * used by pre-version 14 servers, not that of some future 
version.
-                */
-               res = executeQueryOrDie(conn,
-               /* Aggregate transition functions */
-                                                               "SELECT 
'aggregate' AS objkind, p.oid::regprocedure::text AS objname "
-                                                               "FROM pg_proc 
AS p "
-                                                               "JOIN 
pg_aggregate AS a ON a.aggfnoid=p.oid "
-                                                               "JOIN pg_proc 
AS transfn ON transfn.oid=a.aggtransfn "
-                                                               "WHERE p.oid >= 
16384 "
-                                                               "AND 
a.aggtransfn = ANY(ARRAY[%s]::regprocedure[]) "
-                                                               "AND 
a.aggtranstype = ANY(ARRAY['anyarray', 'anyelement']::regtype[]) "
-
-               /* Aggregate final functions */
-                                                               "UNION ALL "
-                                                               "SELECT 
'aggregate' AS objkind, p.oid::regprocedure::text AS objname "
-                                                               "FROM pg_proc 
AS p "
-                                                               "JOIN 
pg_aggregate AS a ON a.aggfnoid=p.oid "
-                                                               "JOIN pg_proc 
AS finalfn ON finalfn.oid=a.aggfinalfn "
-                                                               "WHERE p.oid >= 
16384 "
-                                                               "AND 
a.aggfinalfn = ANY(ARRAY[%s]::regprocedure[]) "
-                                                               "AND 
a.aggtranstype = ANY(ARRAY['anyarray', 'anyelement']::regtype[]) "
-
-               /* Operators */
-                                                               "UNION ALL "
-                                                               "SELECT 
'operator' AS objkind, op.oid::regoperator::text AS objname "
-                                                               "FROM 
pg_operator AS op "
-                                                               "WHERE op.oid 
>= 16384 "
-                                                               "AND oprcode = 
ANY(ARRAY[%s]::regprocedure[]) "
-                                                               "AND oprleft = 
ANY(ARRAY['anyarray', 'anyelement']::regtype[]);",
-                                                               
old_polymorphics.data,
-                                                               
old_polymorphics.data,
-                                                               
old_polymorphics.data);
-
-               ntups = PQntuples(res);
-
-               i_objkind = PQfnumber(res, "objkind");
-               i_objname = PQfnumber(res, "objname");
-
-               for (int rowno = 0; rowno < ntups; rowno++)
-               {
-                       if (script == NULL &&
-                               (script = fopen_priv(output_path, "w")) == NULL)
-                               pg_fatal("could not open file \"%s\": %m", 
output_path);
-                       if (!db_used)
-                       {
-                               fprintf(script, "In database: %s\n", 
active_db->db_name);
-                               db_used = true;
-                       }
-
-                       fprintf(script, "  %s: %s\n",
-                                       PQgetvalue(res, rowno, i_objkind),
-                                       PQgetvalue(res, rowno, i_objname));
-               }
-
-               PQclear(res);
-               PQfinish(conn);
-       }
+       async_task_add_step(task, incompat_polymorphics_query,
+                                               incompat_polymorphics_process, 
true, &state);
+       async_task_run(task, cluster);
+       async_task_free(task);
 
-       if (script)
+       if (state.script)
        {
-               fclose(script);
+               fclose(state.script);
                pg_log(PG_REPORT, "fatal");
                pg_fatal("Your installation contains user-defined objects that 
refer to internal\n"
                                 "polymorphic functions with arguments of type 
\"anyarray\" or \"anyelement\".\n"
@@ -1502,12 +1510,12 @@ check_for_incompatible_polymorphics(ClusterInfo 
*cluster)
                                 "afterwards, changing them to refer to the new 
corresponding functions with\n"
                                 "arguments of type \"anycompatiblearray\" and 
\"anycompatible\".\n"
                                 "A list of the problematic objects is in the 
file:\n"
-                                "    %s", output_path);
+                                "    %s", state.output_path);
        }
        else
                check_ok();
 
-       termPQExpBuffer(&old_polymorphics);
+       termPQExpBuffer(&state.old_polymorphics);
 }
 
 /*
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index bbd8827c2d..8555c2c6bd 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -3535,6 +3535,7 @@ hstoreUpgrade_t
 hyperLogLogState
 ifState
 import_error_callback_arg
+incompat_polymorphics_state
 indexed_tlist
 inet
 inetKEY
-- 
2.39.3 (Apple Git-146)

>From 0c51fe8e3ddfaa59c24dd5351a55d025a8fb2681 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Mon, 8 Jul 2024 21:42:22 -0500
Subject: [PATCH v4 11/12] parallelize tables with oids check in pg_upgrade

---
 src/bin/pg_upgrade/check.c | 90 ++++++++++++++++++++------------------
 1 file changed, 48 insertions(+), 42 deletions(-)

diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index 6c11732e70..639a62d426 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -1518,15 +1518,58 @@ check_for_incompatible_polymorphics(ClusterInfo 
*cluster)
        termPQExpBuffer(&state.old_polymorphics);
 }
 
+static char *
+with_oids_query(DbInfo *dbinfo, void *arg)
+{
+       return pg_strdup("SELECT n.nspname, c.relname "
+                                        "FROM   pg_catalog.pg_class c, "
+                                        "       pg_catalog.pg_namespace n "
+                                        "WHERE  c.relnamespace = n.oid AND "
+                                        "       c.relhasoids AND"
+                                        "       n.nspname NOT IN 
('pg_catalog')");
+}
+
+static void
+with_oids_process(DbInfo *dbinfo, PGresult *res, void *arg)
+{
+       bool            db_used = false;
+       int                     ntups = PQntuples(res);
+       char            output_path[MAXPGPATH];
+       int                     i_nspname = PQfnumber(res, "nspname");
+       int                     i_relname = PQfnumber(res, "relname");
+       FILE      **script = (FILE **) arg;
+
+       if (!ntups)
+               return;
+
+       snprintf(output_path, sizeof(output_path), "%s/%s",
+                        log_opts.basedir,
+                        "tables_with_oids.txt");
+
+       for (int rowno = 0; rowno < ntups; rowno++)
+       {
+               if (*script == NULL && (*script = fopen_priv(output_path, "w")) 
== NULL)
+                       pg_fatal("could not open file \"%s\": %m", output_path);
+               if (!db_used)
+               {
+                       fprintf(*script, "In database: %s\n", dbinfo->db_name);
+                       db_used = true;
+               }
+               fprintf(*script, "  %s.%s\n",
+                               PQgetvalue(res, rowno, i_nspname),
+                               PQgetvalue(res, rowno, i_relname));
+       }
+}
+
 /*
  * Verify that no tables are declared WITH OIDS.
  */
 static void
 check_for_tables_with_oids(ClusterInfo *cluster)
 {
-       int                     dbnum;
        FILE       *script = NULL;
        char            output_path[MAXPGPATH];
+       AsyncTask  *task = async_task_create();
 
        prep_status("Checking for tables WITH OIDS");
 
@@ -1534,47 +1577,10 @@ check_for_tables_with_oids(ClusterInfo *cluster)
                         log_opts.basedir,
                         "tables_with_oids.txt");
 
-       /* Find any tables declared WITH OIDS */
-       for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
-       {
-               PGresult   *res;
-               bool            db_used = false;
-               int                     ntups;
-               int                     rowno;
-               int                     i_nspname,
-                                       i_relname;
-               DbInfo     *active_db = &cluster->dbarr.dbs[dbnum];
-               PGconn     *conn = connectToServer(cluster, active_db->db_name);
-
-               res = executeQueryOrDie(conn,
-                                                               "SELECT 
n.nspname, c.relname "
-                                                               "FROM   
pg_catalog.pg_class c, "
-                                                               "               
pg_catalog.pg_namespace n "
-                                                               "WHERE  
c.relnamespace = n.oid AND "
-                                                               "               
c.relhasoids AND"
-                                                               "       
n.nspname NOT IN ('pg_catalog')");
-
-               ntups = PQntuples(res);
-               i_nspname = PQfnumber(res, "nspname");
-               i_relname = PQfnumber(res, "relname");
-               for (rowno = 0; rowno < ntups; rowno++)
-               {
-                       if (script == NULL && (script = fopen_priv(output_path, 
"w")) == NULL)
-                               pg_fatal("could not open file \"%s\": %m", 
output_path);
-                       if (!db_used)
-                       {
-                               fprintf(script, "In database: %s\n", 
active_db->db_name);
-                               db_used = true;
-                       }
-                       fprintf(script, "  %s.%s\n",
-                                       PQgetvalue(res, rowno, i_nspname),
-                                       PQgetvalue(res, rowno, i_relname));
-               }
-
-               PQclear(res);
-
-               PQfinish(conn);
-       }
+       async_task_add_step(task, with_oids_query,
+                                               with_oids_process, true, 
&script);
+       async_task_run(task, cluster);
+       async_task_free(task);
 
        if (script)
        {
-- 
2.39.3 (Apple Git-146)

>From 9c64682938929eb7da0d901213471c14b7fb0227 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Mon, 8 Jul 2024 21:52:13 -0500
Subject: [PATCH v4 12/12] parallelize user defined encoding conversions check
 in pg_upgrade

---
 src/bin/pg_upgrade/check.c | 107 ++++++++++++++++++++-----------------
 1 file changed, 57 insertions(+), 50 deletions(-)

diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index 639a62d426..b20651c851 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -1654,15 +1654,66 @@ check_for_pg_role_prefix(ClusterInfo *cluster)
                check_ok();
 }
 
+static char *
+user_defined_encoding_conversions_query(DbInfo *dbinfo, void *arg)
+{
+       /*
+        * The query below hardcodes FirstNormalObjectId as 16384 rather than
+        * interpolating that C #define into the query because, if that #define 
is
+        * ever changed, the cutoff we want to use is the value used by
+        * pre-version 14 servers, not that of some future version.
+        */
+       return pstrdup("SELECT c.oid as conoid, c.conname, n.nspname "
+                                  "FROM pg_catalog.pg_conversion c, "
+                                  "     pg_catalog.pg_namespace n "
+                                  "WHERE c.connamespace = n.oid AND "
+                                  "      c.oid >= 16384");
+}
+
+static void
+user_defined_encoding_conversions_process(DbInfo *dbinfo, PGresult *res, void 
*arg)
+{
+       FILE      **script = (FILE **) arg;
+       bool            db_used = false;
+       int                     ntups = PQntuples(res);
+       char            output_path[MAXPGPATH];
+       int                     i_conoid = PQfnumber(res, "conoid");
+       int                     i_conname = PQfnumber(res, "conname");
+       int                     i_nspname = PQfnumber(res, "nspname");
+
+       if (!ntups)
+               return;
+
+       snprintf(output_path, sizeof(output_path), "%s/%s",
+                        log_opts.basedir,
+                        "encoding_conversions.txt");
+
+       for (int rowno = 0; rowno < ntups; rowno++)
+       {
+               if (*script == NULL &&
+                       (*script = fopen_priv(output_path, "w")) == NULL)
+                       pg_fatal("could not open file \"%s\": %m", output_path);
+               if (!db_used)
+               {
+                       fprintf(*script, "In database: %s\n", dbinfo->db_name);
+                       db_used = true;
+               }
+               fprintf(*script, "  (oid=%s) %s.%s\n",
+                               PQgetvalue(res, rowno, i_conoid),
+                               PQgetvalue(res, rowno, i_nspname),
+                               PQgetvalue(res, rowno, i_conname));
+       }
+}
+
 /*
  * Verify that no user-defined encoding conversions exist.
  */
 static void
 check_for_user_defined_encoding_conversions(ClusterInfo *cluster)
 {
-       int                     dbnum;
        FILE       *script = NULL;
        char            output_path[MAXPGPATH];
+       AsyncTask  *task = async_task_create();
 
        prep_status("Checking for user-defined encoding conversions");
 
@@ -1670,55 +1721,11 @@ check_for_user_defined_encoding_conversions(ClusterInfo 
*cluster)
                         log_opts.basedir,
                         "encoding_conversions.txt");
 
-       /* Find any user defined encoding conversions */
-       for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
-       {
-               PGresult   *res;
-               bool            db_used = false;
-               int                     ntups;
-               int                     rowno;
-               int                     i_conoid,
-                                       i_conname,
-                                       i_nspname;
-               DbInfo     *active_db = &cluster->dbarr.dbs[dbnum];
-               PGconn     *conn = connectToServer(cluster, active_db->db_name);
-
-               /*
-                * The query below hardcodes FirstNormalObjectId as 16384 
rather than
-                * interpolating that C #define into the query because, if that
-                * #define is ever changed, the cutoff we want to use is the 
value
-                * used by pre-version 14 servers, not that of some future 
version.
-                */
-               res = executeQueryOrDie(conn,
-                                                               "SELECT c.oid 
as conoid, c.conname, n.nspname "
-                                                               "FROM 
pg_catalog.pg_conversion c, "
-                                                               "     
pg_catalog.pg_namespace n "
-                                                               "WHERE 
c.connamespace = n.oid AND "
-                                                               "      c.oid >= 
16384");
-               ntups = PQntuples(res);
-               i_conoid = PQfnumber(res, "conoid");
-               i_conname = PQfnumber(res, "conname");
-               i_nspname = PQfnumber(res, "nspname");
-               for (rowno = 0; rowno < ntups; rowno++)
-               {
-                       if (script == NULL &&
-                               (script = fopen_priv(output_path, "w")) == NULL)
-                               pg_fatal("could not open file \"%s\": %m", 
output_path);
-                       if (!db_used)
-                       {
-                               fprintf(script, "In database: %s\n", 
active_db->db_name);
-                               db_used = true;
-                       }
-                       fprintf(script, "  (oid=%s) %s.%s\n",
-                                       PQgetvalue(res, rowno, i_conoid),
-                                       PQgetvalue(res, rowno, i_nspname),
-                                       PQgetvalue(res, rowno, i_conname));
-               }
-
-               PQclear(res);
-
-               PQfinish(conn);
-       }
+       async_task_add_step(task, user_defined_encoding_conversions_query,
+                                               
user_defined_encoding_conversions_process, true,
+                                               &script);
+       async_task_run(task, cluster);
+       async_task_free(task);
 
        if (script)
        {
-- 
2.39.3 (Apple Git-146)

Reply via email to