On Sat, Aug 10, 2024 at 10:35:46AM -0500, Nathan Bossart wrote:
> Another option might be to combine all the queries for a task into a single
> string and then send that in one PQsendQuery() call.  That may be a simpler
> way to eliminate the time between queries.

I tried this out and didn't see any difference in my tests.  However, the
idea seems sound, and I could remove ~40 lines of code by doing this and by
making the search_path query an implicit first step (instead of its own
state).  So, here's a v9 of the patch set with those changes.

-- 
nathan
>From 8d8577b3dd1bbdbe584816dfb9045a5988862412 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Fri, 28 Jun 2024 11:02:44 -0500
Subject: [PATCH v9 01/11] introduce framework for parallelizing pg_upgrade
 tasks

---
 src/bin/pg_upgrade/Makefile      |   1 +
 src/bin/pg_upgrade/async.c       | 425 +++++++++++++++++++++++++++++++
 src/bin/pg_upgrade/meson.build   |   1 +
 src/bin/pg_upgrade/pg_upgrade.h  |  14 +
 src/tools/pgindent/typedefs.list |   4 +
 5 files changed, 445 insertions(+)
 create mode 100644 src/bin/pg_upgrade/async.c

diff --git a/src/bin/pg_upgrade/Makefile b/src/bin/pg_upgrade/Makefile
index bde91e2beb..3bc4f5d740 100644
--- a/src/bin/pg_upgrade/Makefile
+++ b/src/bin/pg_upgrade/Makefile
@@ -12,6 +12,7 @@ include $(top_builddir)/src/Makefile.global
 
 OBJS = \
        $(WIN32RES) \
+       async.o \
        check.o \
        controldata.o \
        dump.o \
diff --git a/src/bin/pg_upgrade/async.c b/src/bin/pg_upgrade/async.c
new file mode 100644
index 0000000000..18a0b01f79
--- /dev/null
+++ b/src/bin/pg_upgrade/async.c
@@ -0,0 +1,425 @@
+/*
+ * async.c
+ *             parallelization via libpq's async APIs
+ *
+ * This framework provides an efficient way of running the various
+ * once-in-each-database tasks required by pg_upgrade.  Specifically, it
+ * parallelizes these tasks by managing a simple state machine of
+ * user_opts.jobs slots and using libpq's asynchronous APIs to establish the
+ * connections and run the queries.  Callers simply need to create a callback
+ * function and build/execute an AsyncTask.  A simple example follows:
+ *
+ *             static void
+ *             my_process_cb(DbInfo *dbinfo, PGresult *res, void *arg)
+ *             {
+ *                     for (int i = 0; i < PQntuples(res); i++)
+ *                     {
+ *                             ... process results ...
+ *                     }
+ *             }
+ *
+ *             void
+ *             my_task(ClusterInfo *cluster)
+ *             {
+ *                     AsyncTask  *task = async_task_create();
+ *
+ *                     async_task_add_step(task,
+ *                                                             "... query text 
...",
+ *                                                             my_process_cb,
+ *                                                             true,   // let 
the task free the PGresult
+ *                                                             NULL);  // 
"arg" pointer for the callbacks
+ *                     async_task_run(task, cluster);
+ *                     async_task_free(task);
+ *             }
+ *
+ * Note that multiple steps can be added to a given task.  When there are
+ * multiple steps, the task will run all of the steps consecutively in the same
+ * database connection before freeing the connection and moving on.  In other
+ * words, it only ever initiates one connection to each database in the
+ * cluster for a given run.
+ *
+ * Copyright (c) 2024, PostgreSQL Global Development Group
+ * src/bin/pg_upgrade/async.c
+ */
+
+#include "postgres_fe.h"
+
+#include "common/connect.h"
+#include "fe_utils/string_utils.h"
+#include "pg_upgrade.h"
+
+/*
+ * dbs_complete stores the number of databases that we have completed
+ * processing.  When this value equals the number of databases in the cluster,
+ * the task is finished.
+ */
+static int     dbs_complete;
+
+/*
+ * dbs_processing stores the index of the next database in the cluster's array
+ * of databases that will be picked up for processing.  It will always be
+ * greater than or equal to dbs_complete.
+ */
+static int     dbs_processing;
+
+/*
+ * This struct stores all the information for a single step of a task.  All
+ * steps in a task are run in a single connection before moving on to the next
+ * database (which requires a new connection).
+ */
+typedef struct AsyncTaskCallbacks
+{
+       AsyncTaskProcessCB process_cb;  /* processes the results of the query */
+       const char *query;                      /* query text */
+       bool            free_result;    /* should we free the result? */
+       void       *arg;                        /* pointer passed to each 
callback */
+} AsyncTaskCallbacks;
+
+/*
+ * This struct is a thin wrapper around an array of steps, i.e.,
+ * AsyncTaskCallbacks.
+ */
+typedef struct AsyncTask
+{
+       AsyncTaskCallbacks *cbs;
+       int                     num_cb_sets;
+} AsyncTask;
+
+/*
+ * The different states for a parallel slot.
+ */
+typedef enum
+{
+       FREE,                                           /* slot available for 
use in a new database */
+       CONNECTING,                                     /* waiting for 
connection to be established */
+       RUNNING_QUERIES,                        /* running/processing queries 
in the task */
+} AsyncSlotState;
+
+/*
+ * We maintain an array of user_opts.jobs slots to execute the task.
+ */
+typedef struct
+{
+       AsyncSlotState state;           /* state of the slot */
+       int                     db;                             /* index of the 
database assigned to slot */
+       int                     query;                  /* index of the current 
query to process */
+       PGconn     *conn;                       /* current connection managed 
by slot */
+} AsyncSlot;
+
+/*
+ * Initializes an AsyncTask.
+ */
+AsyncTask *
+async_task_create(void)
+{
+       AsyncTask  *task = pg_malloc0(sizeof(AsyncTask));
+
+       /* All tasks must first set a secure search_path. */
+       async_task_add_step(task, ALWAYS_SECURE_SEARCH_PATH_SQL, NULL, true, 
NULL);
+       return task;
+}
+
+/*
+ * Frees all storage associated with an AsyncTask.
+ */
+void
+async_task_free(AsyncTask *task)
+{
+       if (task->cbs)
+               pg_free(task->cbs);
+
+       pg_free(task);
+}
+
+/*
+ * Adds a step to an AsyncTask.  The steps will be executed in each database in
+ * the order in which they are added.
+ *
+ *     task: task object that must have been initialized via 
async_task_create()
+ *     query: the query text
+ *     process_cb: function that processes the results of the query
+ *     free_result: should we free the PGresult, or leave it to the caller?
+ *     arg: pointer to task-specific data that is passed to each callback
+ */
+void
+async_task_add_step(AsyncTask *task, const char *query,
+                                       AsyncTaskProcessCB process_cb, bool 
free_result,
+                                       void *arg)
+{
+       AsyncTaskCallbacks *new_cbs;
+
+       task->cbs = pg_realloc(task->cbs,
+                                                  ++task->num_cb_sets * 
sizeof(AsyncTaskCallbacks));
+
+       new_cbs = &task->cbs[task->num_cb_sets - 1];
+       new_cbs->process_cb = process_cb;
+       new_cbs->query = query;
+       new_cbs->free_result = free_result;
+       new_cbs->arg = arg;
+}
+
+/*
+ * A simple wrapper around a pg_fatal() that includes the error message for the
+ * connection.
+ */
+static void
+conn_failure(PGconn *conn)
+{
+       pg_fatal("connection failure: %s", PQerrorMessage(conn));
+}
+
+/*
+ * Build a connection string for the slot's current database and asynchronously
+ * start a new connection, but do not wait for the connection to be
+ * established.
+ */
+static void
+start_conn(const ClusterInfo *cluster, AsyncSlot *slot)
+{
+       PQExpBufferData conn_opts;
+
+       /* Build connection string with proper quoting */
+       initPQExpBuffer(&conn_opts);
+       appendPQExpBufferStr(&conn_opts, "dbname=");
+       appendConnStrVal(&conn_opts, cluster->dbarr.dbs[slot->db].db_name);
+       appendPQExpBufferStr(&conn_opts, " user=");
+       appendConnStrVal(&conn_opts, os_info.user);
+       appendPQExpBuffer(&conn_opts, " port=%d", cluster->port);
+       if (cluster->sockdir)
+       {
+               appendPQExpBufferStr(&conn_opts, " host=");
+               appendConnStrVal(&conn_opts, cluster->sockdir);
+       }
+
+       slot->conn = PQconnectStart(conn_opts.data);
+
+       if (!slot->conn)
+               pg_fatal("failed to create connection with connection string: 
\"%s\"",
+                                conn_opts.data);
+
+       termPQExpBuffer(&conn_opts);
+}
+
+/*
+ * Run the process_cb callback function to process the result of a query, and
+ * free the result if the caller indicated we should do so.
+ */
+static void
+process_query_result(const ClusterInfo *cluster, AsyncSlot *slot,
+                                        const AsyncTask *task)
+{
+       AsyncTaskCallbacks *cbs = &task->cbs[slot->query];
+       AsyncTaskProcessCB process_cb = cbs->process_cb;
+       DbInfo     *dbinfo = &cluster->dbarr.dbs[slot->db];
+       PGresult   *res = PQgetResult(slot->conn);
+
+       if (PQstatus(slot->conn) == CONNECTION_BAD ||
+               (PQresultStatus(res) != PGRES_TUPLES_OK &&
+                PQresultStatus(res) != PGRES_COMMAND_OK))
+               conn_failure(slot->conn);
+
+       if (process_cb)
+               (*process_cb) (dbinfo, res, cbs->arg);
+
+       if (cbs->free_result)
+               PQclear(res);
+}
+
+/*
+ * Advances the state machine for a given slot as necessary.
+ */
+static void
+process_slot(const ClusterInfo *cluster, AsyncSlot *slot, const AsyncTask 
*task)
+{
+       PQExpBufferData queries;
+
+       switch (slot->state)
+       {
+               case FREE:
+
+                       /*
+                        * If all of the databases in the cluster have been 
processed or
+                        * are currently being processed by other slots, we are 
done.
+                        */
+                       if (dbs_processing >= cluster->dbarr.ndbs)
+                               return;
+
+                       /*
+                        * Claim the next database in the cluster's array and 
initiate a
+                        * new connection.
+                        */
+                       slot->db = dbs_processing++;
+                       slot->state = CONNECTING;
+                       start_conn(cluster, slot);
+
+                       return;
+
+               case CONNECTING:
+
+                       /* Check for connection failure. */
+                       if (PQconnectPoll(slot->conn) == PGRES_POLLING_FAILED)
+                               conn_failure(slot->conn);
+
+                       /* Check whether the connection is still establishing. 
*/
+                       if (PQconnectPoll(slot->conn) != PGRES_POLLING_OK)
+                               return;
+
+                       /*
+                        * Move on to running/processing the queries in the 
task.  We
+                        * combine all the queries and send them to the server 
together.
+                        */
+                       slot->state = RUNNING_QUERIES;
+                       initPQExpBuffer(&queries);
+                       for (int i = 0; i < task->num_cb_sets; i++)
+                               appendPQExpBuffer(&queries, "%s;", 
task->cbs[i].query);
+                       if (!PQsendQuery(slot->conn, queries.data))
+                               conn_failure(slot->conn);
+                       termPQExpBuffer(&queries);
+
+                       return;
+
+               case RUNNING_QUERIES:
+
+                       /*
+                        * Process any results that are ready so that we can 
free up this
+                        * slot for another database as soon as possible.
+                        */
+                       for (; slot->query < task->num_cb_sets; slot->query++)
+                       {
+                               /* If no more results are available yet, move 
on. */
+                               if (!PQconsumeInput(slot->conn))
+                                       conn_failure(slot->conn);
+                               if (PQisBusy(slot->conn))
+                                       return;
+
+                               process_query_result(cluster, slot, task);
+                       }
+
+                       /*
+                        * If we just finished processing the result of the 
last step in
+                        * the task, free the slot.  We recursively call this 
function on
+                        * the newly-freed slot so that we can start initiating 
the next
+                        * connection immediately instead of waiting for the 
next loop
+                        * through the slots.
+                        */
+                       dbs_complete++;
+                       (void) PQgetResult(slot->conn);
+                       PQfinish(slot->conn);
+                       memset(slot, 0, sizeof(AsyncSlot));
+
+                       process_slot(cluster, slot, task);
+
+                       return;
+       }
+}
+
+/*
+ * Wait on the slots to either finish connecting or to receive query results if
+ * possible.  This avoids a tight loop in async_task_run().
+ */
+static void
+wait_on_slots(AsyncSlot *slots, int numslots)
+{
+       fd_set          input_mask;
+       fd_set          output_mask;
+       fd_set          except_mask;
+       int                     maxFd = 0;
+
+       FD_ZERO(&input_mask);
+       FD_ZERO(&output_mask);
+       FD_ZERO(&except_mask);
+
+       for (int i = 0; i < numslots; i++)
+       {
+               int                     sock;
+               bool            read = false;
+
+               switch (slots[i].state)
+               {
+                       case FREE:
+
+                               /*
+                                * This function should only ever see free 
slots as we are
+                                * finishing processing the last few databases, 
at which point
+                                * we don't have any databases left for them to 
process. We'll
+                                * never use these slots again, so we can 
safely ignore them.
+                                */
+                               continue;
+
+                       case CONNECTING:
+
+                               /*
+                                * If we are waiting for the connection to 
establish, choose
+                                * whether to wait for reading or for writing 
on the socket as
+                                * appropriate.  If neither apply, just return 
immediately so
+                                * that we can handle the slot.
+                                */
+                               {
+                                       PostgresPollingStatusType status;
+
+                                       status = PQconnectPoll(slots[i].conn);
+                                       if (status == PGRES_POLLING_READING)
+                                               read = true;
+                                       else if (status != 
PGRES_POLLING_WRITING)
+                                               return;
+                               }
+                               break;
+
+                       case RUNNING_QUERIES:
+
+                               /*
+                                * Once we've sent the queries, we must wait 
for the socket to
+                                * be read-ready.  Note that process_slot() 
handles calling
+                                * PQconsumeInput() as required.
+                                */
+                               read = true;
+                               break;
+               }
+
+               /*
+                * If there's some problem retrieving the socket, just pretend 
this
+                * slot doesn't exist.  We don't expect this to happen 
regularly in
+                * practice, so it seems unlikely to cause too much harm.
+                */
+               sock = PQsocket(slots[i].conn);
+               if (sock < 0)
+                       continue;
+
+               /*
+                * Add the socket to the set.
+                */
+               FD_SET(sock, read ? &input_mask : &output_mask);
+               FD_SET(sock, &except_mask);
+               maxFd = Max(maxFd, sock);
+       }
+
+       /*
+        * If we found socket(s) to wait on, wait.
+        */
+       if (maxFd != 0)
+               (void) select(maxFd + 1, &input_mask, &output_mask, 
&except_mask, NULL);
+}
+
+/*
+ * Runs all the steps of the task in every database in the cluster using
+ * user_opts.jobs parallel slots.
+ */
+void
+async_task_run(const AsyncTask *task, const ClusterInfo *cluster)
+{
+       int                     jobs = Max(1, user_opts.jobs);
+       AsyncSlot  *slots = pg_malloc0(sizeof(AsyncSlot) * jobs);
+
+       dbs_complete = 0;
+       dbs_processing = 0;
+
+       while (dbs_complete < cluster->dbarr.ndbs)
+       {
+               for (int i = 0; i < jobs; i++)
+                       process_slot(cluster, &slots[i], task);
+
+               wait_on_slots(slots, jobs);
+       }
+
+       pg_free(slots);
+}
diff --git a/src/bin/pg_upgrade/meson.build b/src/bin/pg_upgrade/meson.build
index 9825fa3305..9eb48e176c 100644
--- a/src/bin/pg_upgrade/meson.build
+++ b/src/bin/pg_upgrade/meson.build
@@ -1,6 +1,7 @@
 # Copyright (c) 2022-2024, PostgreSQL Global Development Group
 
 pg_upgrade_sources = files(
+  'async.c',
   'check.c',
   'controldata.c',
   'dump.c',
diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h
index cdb6e2b759..5f319af34d 100644
--- a/src/bin/pg_upgrade/pg_upgrade.h
+++ b/src/bin/pg_upgrade/pg_upgrade.h
@@ -494,3 +494,17 @@ void               parallel_transfer_all_new_dbs(DbInfoArr 
*old_db_arr, DbInfoArr *new_db_arr
                                                                                
  char *old_pgdata, char *new_pgdata,
                                                                                
  char *old_tablespace);
 bool           reap_child(bool wait_for_child);
+
+/* async.c */
+
+typedef void (*AsyncTaskProcessCB) (DbInfo *dbinfo, PGresult *res, void *arg);
+
+/* struct definition is private to async.c */
+typedef struct AsyncTask AsyncTask;
+
+AsyncTask  *async_task_create(void);
+void           async_task_add_step(AsyncTask *task, const char *query,
+                                                               
AsyncTaskProcessCB process_cb, bool free_result,
+                                                               void *arg);
+void           async_task_run(const AsyncTask *task, const ClusterInfo 
*cluster);
+void           async_task_free(AsyncTask *task);
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 547d14b3e7..ec8106329d 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -153,6 +153,10 @@ ArrayMetaState
 ArraySubWorkspace
 ArrayToken
 ArrayType
+AsyncSlot
+AsyncSlotState
+AsyncTask
+AsyncTaskCallbacks
 AsyncQueueControl
 AsyncQueueEntry
 AsyncRequest
-- 
2.39.3 (Apple Git-146)

>From 2bc5d53bd2bac29dd447a286c7e6058c70bbe348 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Fri, 28 Jun 2024 17:21:19 -0500
Subject: [PATCH v9 02/11] use new pg_upgrade async API for subscription state
 checks

---
 src/bin/pg_upgrade/check.c | 204 ++++++++++++++++++++-----------------
 1 file changed, 110 insertions(+), 94 deletions(-)

diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index 5038231731..48f5a0c0e6 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -1905,6 +1905,79 @@ check_old_cluster_for_valid_slots(void)
        check_ok();
 }
 
+/* private state for subscription state checks */
+struct substate_info
+{
+       FILE       *script;
+       char            output_path[MAXPGPATH];
+};
+
+/*
+ * We don't allow upgrade if there is a risk of dangling slot or origin
+ * corresponding to initial sync after upgrade.
+ *
+ * A slot/origin not created yet refers to the 'i' (initialize) state, while
+ * 'r' (ready) state refers to a slot/origin created previously but already
+ * dropped. These states are supported for pg_upgrade. The other states listed
+ * below are not supported:
+ *
+ * a) SUBREL_STATE_DATASYNC: A relation upgraded while in this state would
+ * retain a replication slot, which could not be dropped by the sync worker
+ * spawned after the upgrade because the subscription ID used for the slot name
+ * won't match anymore.
+ *
+ * b) SUBREL_STATE_SYNCDONE: A relation upgraded while in this state would
+ * retain the replication origin when there is a failure in tablesync worker
+ * immediately after dropping the replication slot in the publisher.
+ *
+ * c) SUBREL_STATE_FINISHEDCOPY: A tablesync worker spawned to work on a
+ * relation upgraded while in this state would expect an origin ID with the OID
+ * of the subscription used before the upgrade, causing it to fail.
+ *
+ * d) SUBREL_STATE_SYNCWAIT, SUBREL_STATE_CATCHUP and SUBREL_STATE_UNKNOWN:
+ * These states are not stored in the catalog, so we need not allow these
+ * states.
+ */
+static const char *
+sub_query(void)
+{
+       return "SELECT r.srsubstate, s.subname, n.nspname, c.relname "
+               "FROM pg_catalog.pg_subscription_rel r "
+               "LEFT JOIN pg_catalog.pg_subscription s"
+               "   ON r.srsubid = s.oid "
+               "LEFT JOIN pg_catalog.pg_class c"
+               "   ON r.srrelid = c.oid "
+               "LEFT JOIN pg_catalog.pg_namespace n"
+               "   ON c.relnamespace = n.oid "
+               "WHERE r.srsubstate NOT IN ('i', 'r') "
+               "ORDER BY s.subname";
+}
+
+static void
+sub_process(DbInfo *dbinfo, PGresult *res, void *arg)
+{
+       struct substate_info *state = (struct substate_info *) arg;
+       int                     ntup = PQntuples(res);
+       int                     i_srsubstate = PQfnumber(res, "srsubstate");
+       int                     i_subname = PQfnumber(res, "subname");
+       int                     i_nspname = PQfnumber(res, "nspname");
+       int                     i_relname = PQfnumber(res, "relname");
+
+       for (int i = 0; i < ntup; i++)
+       {
+               if (state->script == NULL &&
+                       (state->script = fopen_priv(state->output_path, "w")) 
== NULL)
+                       pg_fatal("could not open file \"%s\": %m", 
state->output_path);
+
+               fprintf(state->script, "The table sync state \"%s\" is not 
allowed for database:\"%s\" subscription:\"%s\" schema:\"%s\" 
relation:\"%s\"\n",
+                               PQgetvalue(res, i, i_srsubstate),
+                               dbinfo->db_name,
+                               PQgetvalue(res, i, i_subname),
+                               PQgetvalue(res, i, i_nspname),
+                               PQgetvalue(res, i, i_relname));
+       }
+}
+
 /*
  * check_old_cluster_subscription_state()
  *
@@ -1915,115 +1988,58 @@ check_old_cluster_for_valid_slots(void)
 static void
 check_old_cluster_subscription_state(void)
 {
-       FILE       *script = NULL;
-       char            output_path[MAXPGPATH];
+       AsyncTask  *task = async_task_create();
+       struct substate_info state;
+       PGresult   *res;
+       PGconn     *conn;
        int                     ntup;
 
        prep_status("Checking for subscription state");
 
-       snprintf(output_path, sizeof(output_path), "%s/%s",
+       state.script = NULL;
+       snprintf(state.output_path, sizeof(state.output_path), "%s/%s",
                         log_opts.basedir,
                         "subs_invalid.txt");
-       for (int dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
-       {
-               PGresult   *res;
-               DbInfo     *active_db = &old_cluster.dbarr.dbs[dbnum];
-               PGconn     *conn = connectToServer(&old_cluster, 
active_db->db_name);
 
-               /* We need to check for pg_replication_origin only once. */
-               if (dbnum == 0)
-               {
-                       /*
-                        * Check that all the subscriptions have their 
respective
-                        * replication origin.
-                        */
-                       res = executeQueryOrDie(conn,
-                                                                       "SELECT 
d.datname, s.subname "
-                                                                       "FROM 
pg_catalog.pg_subscription s "
-                                                                       "LEFT 
OUTER JOIN pg_catalog.pg_replication_origin o "
-                                                                       "       
ON o.roname = 'pg_' || s.oid "
-                                                                       "INNER 
JOIN pg_catalog.pg_database d "
-                                                                       "       
ON d.oid = s.subdbid "
-                                                                       "WHERE 
o.roname IS NULL;");
-
-                       ntup = PQntuples(res);
-                       for (int i = 0; i < ntup; i++)
-                       {
-                               if (script == NULL && (script = 
fopen_priv(output_path, "w")) == NULL)
-                                       pg_fatal("could not open file \"%s\": 
%m", output_path);
-                               fprintf(script, "The replication origin is 
missing for database:\"%s\" subscription:\"%s\"\n",
-                                               PQgetvalue(res, i, 0),
-                                               PQgetvalue(res, i, 1));
-                       }
-                       PQclear(res);
-               }
-
-               /*
-                * We don't allow upgrade if there is a risk of dangling slot or
-                * origin corresponding to initial sync after upgrade.
-                *
-                * A slot/origin not created yet refers to the 'i' (initialize) 
state,
-                * while 'r' (ready) state refers to a slot/origin created 
previously
-                * but already dropped. These states are supported for 
pg_upgrade. The
-                * other states listed below are not supported:
-                *
-                * a) SUBREL_STATE_DATASYNC: A relation upgraded while in this 
state
-                * would retain a replication slot, which could not be dropped 
by the
-                * sync worker spawned after the upgrade because the 
subscription ID
-                * used for the slot name won't match anymore.
-                *
-                * b) SUBREL_STATE_SYNCDONE: A relation upgraded while in this 
state
-                * would retain the replication origin when there is a failure 
in
-                * tablesync worker immediately after dropping the replication 
slot in
-                * the publisher.
-                *
-                * c) SUBREL_STATE_FINISHEDCOPY: A tablesync worker spawned to 
work on
-                * a relation upgraded while in this state would expect an 
origin ID
-                * with the OID of the subscription used before the upgrade, 
causing
-                * it to fail.
-                *
-                * d) SUBREL_STATE_SYNCWAIT, SUBREL_STATE_CATCHUP and
-                * SUBREL_STATE_UNKNOWN: These states are not stored in the 
catalog,
-                * so we need not allow these states.
-                */
-               res = executeQueryOrDie(conn,
-                                                               "SELECT 
r.srsubstate, s.subname, n.nspname, c.relname "
-                                                               "FROM 
pg_catalog.pg_subscription_rel r "
-                                                               "LEFT JOIN 
pg_catalog.pg_subscription s"
-                                                               "       ON 
r.srsubid = s.oid "
-                                                               "LEFT JOIN 
pg_catalog.pg_class c"
-                                                               "       ON 
r.srrelid = c.oid "
-                                                               "LEFT JOIN 
pg_catalog.pg_namespace n"
-                                                               "       ON 
c.relnamespace = n.oid "
-                                                               "WHERE 
r.srsubstate NOT IN ('i', 'r') "
-                                                               "ORDER BY 
s.subname");
-
-               ntup = PQntuples(res);
-               for (int i = 0; i < ntup; i++)
-               {
-                       if (script == NULL && (script = fopen_priv(output_path, 
"w")) == NULL)
-                               pg_fatal("could not open file \"%s\": %m", 
output_path);
+       /*
+        * Check that all the subscriptions have their respective replication
+        * origin.  This check only needs to run once.
+        */
+       conn = connectToServer(&old_cluster, old_cluster.dbarr.dbs[0].db_name);
+       res = executeQueryOrDie(conn,
+                                                       "SELECT d.datname, 
s.subname "
+                                                       "FROM 
pg_catalog.pg_subscription s "
+                                                       "LEFT OUTER JOIN 
pg_catalog.pg_replication_origin o "
+                                                       "       ON o.roname = 
'pg_' || s.oid "
+                                                       "INNER JOIN 
pg_catalog.pg_database d "
+                                                       "       ON d.oid = 
s.subdbid "
+                                                       "WHERE o.roname IS 
NULL;");
+       ntup = PQntuples(res);
+       for (int i = 0; i < ntup; i++)
+       {
+               if (state.script == NULL &&
+                       (state.script = fopen_priv(state.output_path, "w")) == 
NULL)
+                       pg_fatal("could not open file \"%s\": %m", 
state.output_path);
+               fprintf(state.script, "The replication origin is missing for 
database:\"%s\" subscription:\"%s\"\n",
+                               PQgetvalue(res, i, 0),
+                               PQgetvalue(res, i, 1));
+       }
+       PQclear(res);
+       PQfinish(conn);
 
-                       fprintf(script, "The table sync state \"%s\" is not 
allowed for database:\"%s\" subscription:\"%s\" schema:\"%s\" 
relation:\"%s\"\n",
-                                       PQgetvalue(res, i, 0),
-                                       active_db->db_name,
-                                       PQgetvalue(res, i, 1),
-                                       PQgetvalue(res, i, 2),
-                                       PQgetvalue(res, i, 3));
-               }
+       async_task_add_step(task, sub_query(), sub_process, true, &state);
 
-               PQclear(res);
-               PQfinish(conn);
-       }
+       async_task_run(task, &old_cluster);
+       async_task_free(task);
 
-       if (script)
+       if (state.script)
        {
-               fclose(script);
+               fclose(state.script);
                pg_log(PG_REPORT, "fatal");
                pg_fatal("Your installation contains subscriptions without 
origin or having relations not in i (initialize) or r (ready) state.\n"
                                 "You can allow the initial sync to finish for 
all relations and then restart the upgrade.\n"
                                 "A list of the problematic subscriptions is in 
the file:\n"
-                                "    %s", output_path);
+                                "    %s", state.output_path);
        }
        else
                check_ok();
-- 
2.39.3 (Apple Git-146)

>From 08781ccc2871d9f3105800ba5038988097b2cb0a Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Fri, 28 Jun 2024 21:09:33 -0500
Subject: [PATCH v9 03/11] use new pg_upgrade async API for retrieving relinfos

---
 src/bin/pg_upgrade/info.c | 246 +++++++++++++++++++-------------------
 1 file changed, 120 insertions(+), 126 deletions(-)

diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c
index 5de5e10945..5d93e9716d 100644
--- a/src/bin/pg_upgrade/info.c
+++ b/src/bin/pg_upgrade/info.c
@@ -11,6 +11,7 @@
 
 #include "access/transam.h"
 #include "catalog/pg_class_d.h"
+#include "pqexpbuffer.h"
 #include "pg_upgrade.h"
 
 static void create_rel_filename_map(const char *old_data, const char *new_data,
@@ -22,12 +23,14 @@ static void report_unmatched_relation(const RelInfo *rel, 
const DbInfo *db,
 static void free_db_and_rel_infos(DbInfoArr *db_arr);
 static void get_template0_info(ClusterInfo *cluster);
 static void get_db_infos(ClusterInfo *cluster);
-static void get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo);
+static char *get_rel_infos_query(void);
+static void get_rel_infos_result(DbInfo *dbinfo, PGresult *res, void *arg);
 static void free_rel_infos(RelInfoArr *rel_arr);
 static void print_db_infos(DbInfoArr *db_arr);
 static void print_rel_infos(RelInfoArr *rel_arr);
 static void print_slot_infos(LogicalSlotInfoArr *slot_arr);
-static void get_old_cluster_logical_slot_infos(DbInfo *dbinfo);
+static char *get_old_cluster_logical_slot_infos_query(void);
+static void get_old_cluster_logical_slot_infos_result(DbInfo *dbinfo, PGresult 
*res, void *arg);
 
 
 /*
@@ -276,7 +279,9 @@ report_unmatched_relation(const RelInfo *rel, const DbInfo 
*db, bool is_new_db)
 void
 get_db_rel_and_slot_infos(ClusterInfo *cluster)
 {
-       int                     dbnum;
+       AsyncTask  *task = async_task_create();
+       char       *rel_infos_query = NULL;
+       char       *logical_slot_infos_query = NULL;
 
        if (cluster->dbarr.dbs != NULL)
                free_db_and_rel_infos(&cluster->dbarr);
@@ -284,15 +289,29 @@ get_db_rel_and_slot_infos(ClusterInfo *cluster)
        get_template0_info(cluster);
        get_db_infos(cluster);
 
-       for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
+       rel_infos_query = get_rel_infos_query();
+       async_task_add_step(task,
+                                               rel_infos_query,
+                                               get_rel_infos_result,
+                                               true, NULL);
+
+       if (cluster == &old_cluster &&
+               GET_MAJOR_VERSION(cluster->major_version) > 1600)
        {
-               DbInfo     *pDbInfo = &cluster->dbarr.dbs[dbnum];
+               logical_slot_infos_query = 
get_old_cluster_logical_slot_infos_query();
+               async_task_add_step(task,
+                                                       
logical_slot_infos_query,
+                                                       
get_old_cluster_logical_slot_infos_result,
+                                                       true, NULL);
+       }
 
-               get_rel_infos(cluster, pDbInfo);
+       async_task_run(task, cluster);
+       async_task_free(task);
 
-               if (cluster == &old_cluster)
-                       get_old_cluster_logical_slot_infos(pDbInfo);
-       }
+       if (rel_infos_query)
+               pg_free(rel_infos_query);
+       if (logical_slot_infos_query)
+               pg_free(logical_slot_infos_query);
 
        if (cluster == &old_cluster)
                pg_log(PG_VERBOSE, "\nsource databases:");
@@ -439,32 +458,12 @@ get_db_infos(ClusterInfo *cluster)
  * Note: the resulting RelInfo array is assumed to be sorted by OID.
  * This allows later processing to match up old and new databases efficiently.
  */
-static void
-get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
+static char *
+get_rel_infos_query(void)
 {
-       PGconn     *conn = connectToServer(cluster,
-                                                                          
dbinfo->db_name);
-       PGresult   *res;
-       RelInfo    *relinfos;
-       int                     ntups;
-       int                     relnum;
-       int                     num_rels = 0;
-       char       *nspname = NULL;
-       char       *relname = NULL;
-       char       *tablespace = NULL;
-       int                     i_spclocation,
-                               i_nspname,
-                               i_relname,
-                               i_reloid,
-                               i_indtable,
-                               i_toastheap,
-                               i_relfilenumber,
-                               i_reltablespace;
-       char            query[QUERY_ALLOC];
-       char       *last_namespace = NULL,
-                          *last_tablespace = NULL;
+       PQExpBufferData query;
 
-       query[0] = '\0';                        /* initialize query string to 
empty */
+       initPQExpBuffer(&query);
 
        /*
         * Create a CTE that collects OIDs of regular user tables and matviews,
@@ -476,34 +475,34 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
         * output, so we have to copy that system table.  It's easiest to do 
that
         * by treating it as a user table.
         */
-       snprintf(query + strlen(query), sizeof(query) - strlen(query),
-                        "WITH regular_heap (reloid, indtable, toastheap) AS ( "
-                        "  SELECT c.oid, 0::oid, 0::oid "
-                        "  FROM pg_catalog.pg_class c JOIN 
pg_catalog.pg_namespace n "
-                        "         ON c.relnamespace = n.oid "
-                        "  WHERE relkind IN (" CppAsString2(RELKIND_RELATION) 
", "
-                        CppAsString2(RELKIND_MATVIEW) ") AND "
+       appendPQExpBuffer(&query,
+                                         "WITH regular_heap (reloid, indtable, 
toastheap) AS ( "
+                                         "  SELECT c.oid, 0::oid, 0::oid "
+                                         "  FROM pg_catalog.pg_class c JOIN 
pg_catalog.pg_namespace n "
+                                         "         ON c.relnamespace = n.oid "
+                                         "  WHERE relkind IN (" 
CppAsString2(RELKIND_RELATION) ", "
+                                         CppAsString2(RELKIND_MATVIEW) ") AND "
        /* exclude possible orphaned temp tables */
-                        "    ((n.nspname !~ '^pg_temp_' AND "
-                        "      n.nspname !~ '^pg_toast_temp_' AND "
-                        "      n.nspname NOT IN ('pg_catalog', 
'information_schema', "
-                        "                        'binary_upgrade', 'pg_toast') 
AND "
-                        "      c.oid >= %u::pg_catalog.oid) OR "
-                        "     (n.nspname = 'pg_catalog' AND "
-                        "      relname IN ('pg_largeobject') ))), ",
-                        FirstNormalObjectId);
+                                         "    ((n.nspname !~ '^pg_temp_' AND "
+                                         "      n.nspname !~ '^pg_toast_temp_' 
AND "
+                                         "      n.nspname NOT IN 
('pg_catalog', 'information_schema', "
+                                         "                        
'binary_upgrade', 'pg_toast') AND "
+                                         "      c.oid >= %u::pg_catalog.oid) 
OR "
+                                         "     (n.nspname = 'pg_catalog' AND "
+                                         "      relname IN ('pg_largeobject') 
))), ",
+                                         FirstNormalObjectId);
 
        /*
         * Add a CTE that collects OIDs of toast tables belonging to the tables
         * selected by the regular_heap CTE.  (We have to do this separately
         * because the namespace-name rules above don't work for toast tables.)
         */
-       snprintf(query + strlen(query), sizeof(query) - strlen(query),
-                        "  toast_heap (reloid, indtable, toastheap) AS ( "
-                        "  SELECT c.reltoastrelid, 0::oid, c.oid "
-                        "  FROM regular_heap JOIN pg_catalog.pg_class c "
-                        "      ON regular_heap.reloid = c.oid "
-                        "  WHERE c.reltoastrelid != 0), ");
+       appendPQExpBufferStr(&query,
+                                                "  toast_heap (reloid, 
indtable, toastheap) AS ( "
+                                                "  SELECT c.reltoastrelid, 
0::oid, c.oid "
+                                                "  FROM regular_heap JOIN 
pg_catalog.pg_class c "
+                                                "      ON regular_heap.reloid 
= c.oid "
+                                                "  WHERE c.reltoastrelid != 
0), ");
 
        /*
         * Add a CTE that collects OIDs of all valid indexes on the previously
@@ -511,53 +510,61 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
         * Testing indisready is necessary in 9.2, and harmless in earlier/later
         * versions.
         */
-       snprintf(query + strlen(query), sizeof(query) - strlen(query),
-                        "  all_index (reloid, indtable, toastheap) AS ( "
-                        "  SELECT indexrelid, indrelid, 0::oid "
-                        "  FROM pg_catalog.pg_index "
-                        "  WHERE indisvalid AND indisready "
-                        "    AND indrelid IN "
-                        "        (SELECT reloid FROM regular_heap "
-                        "         UNION ALL "
-                        "         SELECT reloid FROM toast_heap)) ");
+       appendPQExpBufferStr(&query,
+                                                "  all_index (reloid, 
indtable, toastheap) AS ( "
+                                                "  SELECT indexrelid, 
indrelid, 0::oid "
+                                                "  FROM pg_catalog.pg_index "
+                                                "  WHERE indisvalid AND 
indisready "
+                                                "    AND indrelid IN "
+                                                "        (SELECT reloid FROM 
regular_heap "
+                                                "         UNION ALL "
+                                                "         SELECT reloid FROM 
toast_heap)) ");
 
        /*
         * And now we can write the query that retrieves the data we want for 
each
         * heap and index relation.  Make sure result is sorted by OID.
         */
-       snprintf(query + strlen(query), sizeof(query) - strlen(query),
-                        "SELECT all_rels.*, n.nspname, c.relname, "
-                        "  c.relfilenode, c.reltablespace, "
-                        "  pg_catalog.pg_tablespace_location(t.oid) AS 
spclocation "
-                        "FROM (SELECT * FROM regular_heap "
-                        "      UNION ALL "
-                        "      SELECT * FROM toast_heap "
-                        "      UNION ALL "
-                        "      SELECT * FROM all_index) all_rels "
-                        "  JOIN pg_catalog.pg_class c "
-                        "      ON all_rels.reloid = c.oid "
-                        "  JOIN pg_catalog.pg_namespace n "
-                        "     ON c.relnamespace = n.oid "
-                        "  LEFT OUTER JOIN pg_catalog.pg_tablespace t "
-                        "     ON c.reltablespace = t.oid "
-                        "ORDER BY 1;");
-
-       res = executeQueryOrDie(conn, "%s", query);
-
-       ntups = PQntuples(res);
-
-       relinfos = (RelInfo *) pg_malloc(sizeof(RelInfo) * ntups);
+       appendPQExpBufferStr(&query,
+                                                "SELECT all_rels.*, n.nspname, 
c.relname, "
+                                                "  c.relfilenode, 
c.reltablespace, "
+                                                "  
pg_catalog.pg_tablespace_location(t.oid) AS spclocation "
+                                                "FROM (SELECT * FROM 
regular_heap "
+                                                "      UNION ALL "
+                                                "      SELECT * FROM 
toast_heap "
+                                                "      UNION ALL "
+                                                "      SELECT * FROM 
all_index) all_rels "
+                                                "  JOIN pg_catalog.pg_class c "
+                                                "      ON all_rels.reloid = 
c.oid "
+                                                "  JOIN 
pg_catalog.pg_namespace n "
+                                                "     ON c.relnamespace = 
n.oid "
+                                                "  LEFT OUTER JOIN 
pg_catalog.pg_tablespace t "
+                                                "     ON c.reltablespace = 
t.oid "
+                                                "ORDER BY 1;");
+
+       return query.data;
+}
 
-       i_reloid = PQfnumber(res, "reloid");
-       i_indtable = PQfnumber(res, "indtable");
-       i_toastheap = PQfnumber(res, "toastheap");
-       i_nspname = PQfnumber(res, "nspname");
-       i_relname = PQfnumber(res, "relname");
-       i_relfilenumber = PQfnumber(res, "relfilenode");
-       i_reltablespace = PQfnumber(res, "reltablespace");
-       i_spclocation = PQfnumber(res, "spclocation");
+static void
+get_rel_infos_result(DbInfo *dbinfo, PGresult *res, void *arg)
+{
+       int                     ntups = PQntuples(res);
+       RelInfo    *relinfos = (RelInfo *) pg_malloc(sizeof(RelInfo) * ntups);
+       int                     i_reloid = PQfnumber(res, "reloid");
+       int                     i_indtable = PQfnumber(res, "indtable");
+       int                     i_toastheap = PQfnumber(res, "toastheap");
+       int                     i_nspname = PQfnumber(res, "nspname");
+       int                     i_relname = PQfnumber(res, "relname");
+       int                     i_relfilenumber = PQfnumber(res, "relfilenode");
+       int                     i_reltablespace = PQfnumber(res, 
"reltablespace");
+       int                     i_spclocation = PQfnumber(res, "spclocation");
+       int                     num_rels = 0;
+       char       *nspname = NULL;
+       char       *relname = NULL;
+       char       *tablespace = NULL;
+       char       *last_namespace = NULL;
+       char       *last_tablespace = NULL;
 
-       for (relnum = 0; relnum < ntups; relnum++)
+       for (int relnum = 0; relnum < ntups; relnum++)
        {
                RelInfo    *curr = &relinfos[num_rels++];
 
@@ -610,9 +617,6 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
                        /* A zero reltablespace oid indicates the database 
tablespace. */
                        curr->tablespace = dbinfo->db_tablespace;
        }
-       PQclear(res);
-
-       PQfinish(conn);
 
        dbinfo->rel_arr.rels = relinfos;
        dbinfo->rel_arr.nrels = num_rels;
@@ -634,20 +638,9 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
  * check_old_cluster_for_valid_slots() would raise a FATAL error if such slots
  * are included.
  */
-static void
-get_old_cluster_logical_slot_infos(DbInfo *dbinfo)
+static char *
+get_old_cluster_logical_slot_infos_query(void)
 {
-       PGconn     *conn;
-       PGresult   *res;
-       LogicalSlotInfo *slotinfos = NULL;
-       int                     num_slots;
-
-       /* Logical slots can be migrated since PG17. */
-       if (GET_MAJOR_VERSION(old_cluster.major_version) <= 1600)
-               return;
-
-       conn = connectToServer(&old_cluster, dbinfo->db_name);
-
        /*
         * Fetch the logical replication slot information. The check whether the
         * slot is considered caught up is done by an upgrade function. This
@@ -665,18 +658,23 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo)
         * started and stopped several times causing any temporary slots to be
         * removed.
         */
-       res = executeQueryOrDie(conn, "SELECT slot_name, plugin, two_phase, 
failover, "
-                                                       "%s as caught_up, 
invalidation_reason IS NOT NULL as invalid "
-                                                       "FROM 
pg_catalog.pg_replication_slots "
-                                                       "WHERE slot_type = 
'logical' AND "
-                                                       "database = 
current_database() AND "
-                                                       "temporary IS FALSE;",
-                                                       user_opts.live_check ? 
"FALSE" :
-                                                       "(CASE WHEN 
invalidation_reason IS NOT NULL THEN FALSE "
-                                                       "ELSE (SELECT 
pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name)) "
-                                                       "END)");
-
-       num_slots = PQntuples(res);
+       return psprintf("SELECT slot_name, plugin, two_phase, failover, "
+                                       "%s as caught_up, invalidation_reason 
IS NOT NULL as invalid "
+                                       "FROM pg_catalog.pg_replication_slots "
+                                       "WHERE slot_type = 'logical' AND "
+                                       "database = current_database() AND "
+                                       "temporary IS FALSE;",
+                                       user_opts.live_check ? "FALSE" :
+                                       "(CASE WHEN invalidation_reason IS NOT 
NULL THEN FALSE "
+                                       "ELSE (SELECT 
pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name)) "
+                                       "END)");
+}
+
+static void
+get_old_cluster_logical_slot_infos_result(DbInfo *dbinfo, PGresult *res, void 
*arg)
+{
+       LogicalSlotInfo *slotinfos = NULL;
+       int                     num_slots = PQntuples(res);
 
        if (num_slots)
        {
@@ -709,14 +707,10 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo)
                }
        }
 
-       PQclear(res);
-       PQfinish(conn);
-
        dbinfo->slot_arr.slots = slotinfos;
        dbinfo->slot_arr.nslots = num_slots;
 }
 
-
 /*
  * count_old_cluster_logical_slots()
  *
-- 
2.39.3 (Apple Git-146)

>From dd92f014151f5a356e8f732eb1773d1d825baed7 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Fri, 28 Jun 2024 21:24:35 -0500
Subject: [PATCH v9 04/11] use new pg_upgrade async API to parallelize getting
 loadable libraries

---
 src/bin/pg_upgrade/function.c | 49 +++++++++++++++++++++--------------
 1 file changed, 29 insertions(+), 20 deletions(-)

diff --git a/src/bin/pg_upgrade/function.c b/src/bin/pg_upgrade/function.c
index 7e3abed098..75e5ebb2c8 100644
--- a/src/bin/pg_upgrade/function.c
+++ b/src/bin/pg_upgrade/function.c
@@ -42,6 +42,20 @@ library_name_compare(const void *p1, const void *p2)
                                          ((const LibraryInfo *) p2)->dbnum);
 }
 
+struct loadable_libraries_state
+{
+       PGresult  **ress;
+       int                     totaltups;
+};
+
+static void
+get_loadable_libraries_result(DbInfo *dbinfo, PGresult *res, void *arg)
+{
+       struct loadable_libraries_state *state = (struct 
loadable_libraries_state *) arg;
+
+       state->ress[dbinfo - old_cluster.dbarr.dbs] = res;
+       state->totaltups += PQntuples(res);
+}
 
 /*
  * get_loadable_libraries()
@@ -54,47 +68,41 @@ library_name_compare(const void *p1, const void *p2)
 void
 get_loadable_libraries(void)
 {
-       PGresult  **ress;
        int                     totaltups;
        int                     dbnum;
        int                     n_libinfos;
+       AsyncTask  *task = async_task_create();
+       struct loadable_libraries_state state;
+       char       *loadable_libraries_query;
 
-       ress = (PGresult **) pg_malloc(old_cluster.dbarr.ndbs * sizeof(PGresult 
*));
-       totaltups = 0;
-
-       /* Fetch all library names, removing duplicates within each DB */
-       for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
-       {
-               DbInfo     *active_db = &old_cluster.dbarr.dbs[dbnum];
-               PGconn     *conn = connectToServer(&old_cluster, 
active_db->db_name);
+       state.ress = (PGresult **) pg_malloc(old_cluster.dbarr.ndbs * 
sizeof(PGresult *));
+       state.totaltups = 0;
 
-               /*
-                * Fetch all libraries containing non-built-in C functions in 
this DB.
-                */
-               ress[dbnum] = executeQueryOrDie(conn,
-                                                                               
"SELECT DISTINCT probin "
+       loadable_libraries_query = psprintf("SELECT DISTINCT probin "
                                                                                
"FROM pg_catalog.pg_proc "
                                                                                
"WHERE prolang = %u AND "
                                                                                
"probin IS NOT NULL AND "
                                                                                
"oid >= %u;",
                                                                                
ClanguageId,
                                                                                
FirstNormalObjectId);
-               totaltups += PQntuples(ress[dbnum]);
 
-               PQfinish(conn);
-       }
+       async_task_add_step(task, loadable_libraries_query,
+                                               get_loadable_libraries_result, 
false, &state);
+
+       async_task_run(task, &old_cluster);
+       async_task_free(task);
 
        /*
         * Allocate memory for required libraries and logical replication output
         * plugins.
         */
-       n_libinfos = totaltups + count_old_cluster_logical_slots();
+       n_libinfos = state.totaltups + count_old_cluster_logical_slots();
        os_info.libraries = (LibraryInfo *) pg_malloc(sizeof(LibraryInfo) * 
n_libinfos);
        totaltups = 0;
 
        for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
        {
-               PGresult   *res = ress[dbnum];
+               PGresult   *res = state.ress[dbnum];
                int                     ntups;
                int                     rowno;
                LogicalSlotInfoArr *slot_arr = 
&old_cluster.dbarr.dbs[dbnum].slot_arr;
@@ -129,7 +137,8 @@ get_loadable_libraries(void)
                }
        }
 
-       pg_free(ress);
+       pg_free(state.ress);
+       pg_free(loadable_libraries_query);
 
        os_info.num_libraries = totaltups;
 }
-- 
2.39.3 (Apple Git-146)

>From 07a53259d5d5e52b081194ec3a9604192fcf3407 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Fri, 28 Jun 2024 21:31:57 -0500
Subject: [PATCH v9 05/11] use new pg_upgrade async API to parallelize
 reporting extension updates

---
 src/bin/pg_upgrade/version.c | 77 +++++++++++++++++-------------------
 1 file changed, 36 insertions(+), 41 deletions(-)

diff --git a/src/bin/pg_upgrade/version.c b/src/bin/pg_upgrade/version.c
index 2de6dffccd..dd2b0cd975 100644
--- a/src/bin/pg_upgrade/version.c
+++ b/src/bin/pg_upgrade/version.c
@@ -139,6 +139,34 @@ old_9_6_invalidate_hash_indexes(ClusterInfo *cluster, bool 
check_mode)
                check_ok();
 }
 
+static void
+report_extension_updates_result(DbInfo *dbinfo, PGresult *res, void *arg)
+{
+       bool            db_used = false;
+       int                     ntups = PQntuples(res);
+       int                     i_name = PQfnumber(res, "name");
+       char       *output_path = "update_extensions.sql";
+       FILE      **script = (FILE **) arg;
+
+       for (int rowno = 0; rowno < ntups; rowno++)
+       {
+               if (*script == NULL && (*script = fopen_priv(output_path, "w")) 
== NULL)
+                       pg_fatal("could not open file \"%s\": %m", output_path);
+               if (!db_used)
+               {
+                       PQExpBufferData connectbuf;
+
+                       initPQExpBuffer(&connectbuf);
+                       appendPsqlMetaConnect(&connectbuf, dbinfo->db_name);
+                       fputs(connectbuf.data, *script);
+                       termPQExpBuffer(&connectbuf);
+                       db_used = true;
+               }
+               fprintf(*script, "ALTER EXTENSION %s UPDATE;\n",
+                               quote_identifier(PQgetvalue(res, rowno, 
i_name)));
+       }
+}
+
 /*
  * report_extension_updates()
  *     Report extensions that should be updated.
@@ -146,53 +174,20 @@ old_9_6_invalidate_hash_indexes(ClusterInfo *cluster, 
bool check_mode)
 void
 report_extension_updates(ClusterInfo *cluster)
 {
-       int                     dbnum;
        FILE       *script = NULL;
        char       *output_path = "update_extensions.sql";
+       AsyncTask  *task = async_task_create();
+       const char *query = "SELECT name "
+               "FROM pg_available_extensions "
+               "WHERE installed_version != default_version";
 
        prep_status("Checking for extension updates");
 
-       for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
-       {
-               PGresult   *res;
-               bool            db_used = false;
-               int                     ntups;
-               int                     rowno;
-               int                     i_name;
-               DbInfo     *active_db = &cluster->dbarr.dbs[dbnum];
-               PGconn     *conn = connectToServer(cluster, active_db->db_name);
-
-               /* find extensions needing updates */
-               res = executeQueryOrDie(conn,
-                                                               "SELECT name "
-                                                               "FROM 
pg_available_extensions "
-                                                               "WHERE 
installed_version != default_version"
-                       );
+       async_task_add_step(task, query, report_extension_updates_result,
+                                               true, &script);
 
-               ntups = PQntuples(res);
-               i_name = PQfnumber(res, "name");
-               for (rowno = 0; rowno < ntups; rowno++)
-               {
-                       if (script == NULL && (script = fopen_priv(output_path, 
"w")) == NULL)
-                               pg_fatal("could not open file \"%s\": %m", 
output_path);
-                       if (!db_used)
-                       {
-                               PQExpBufferData connectbuf;
-
-                               initPQExpBuffer(&connectbuf);
-                               appendPsqlMetaConnect(&connectbuf, 
active_db->db_name);
-                               fputs(connectbuf.data, script);
-                               termPQExpBuffer(&connectbuf);
-                               db_used = true;
-                       }
-                       fprintf(script, "ALTER EXTENSION %s UPDATE;\n",
-                                       quote_identifier(PQgetvalue(res, rowno, 
i_name)));
-               }
-
-               PQclear(res);
-
-               PQfinish(conn);
-       }
+       async_task_run(task, cluster);
+       async_task_free(task);
 
        if (script)
        {
-- 
2.39.3 (Apple Git-146)

>From d54d60a2d71bee556c6b9343beafe40fb5bc2f54 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Sat, 6 Jul 2024 21:06:31 -0500
Subject: [PATCH v9 06/11] parallelize data type checks in pg_upgrade

---
 src/bin/pg_upgrade/check.c | 333 +++++++++++++++++++------------------
 1 file changed, 173 insertions(+), 160 deletions(-)

diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index 48f5a0c0e6..150c101d1c 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -314,6 +314,133 @@ static DataTypesUsageChecks data_types_usage_checks[] =
        }
 };
 
+struct data_type_check_state
+{
+       DataTypesUsageChecks *check;
+       bool            result;
+       PQExpBuffer *report;
+};
+
+static char *
+data_type_check_query(int checknum)
+{
+       DataTypesUsageChecks *check = &data_types_usage_checks[checknum];
+
+       return psprintf("WITH RECURSIVE oids AS ( "
+       /* start with the type(s) returned by base_query */
+                                       "       %s "
+                                       "       UNION ALL "
+                                       "       SELECT * FROM ( "
+       /* inner WITH because we can only reference the CTE once */
+                                       "               WITH x AS (SELECT oid 
FROM oids) "
+       /* domains on any type selected so far */
+                                       "                       SELECT t.oid 
FROM pg_catalog.pg_type t, x WHERE typbasetype = x.oid AND typtype = 'd' "
+                                       "                       UNION ALL "
+       /* arrays over any type selected so far */
+                                       "                       SELECT t.oid 
FROM pg_catalog.pg_type t, x WHERE typelem = x.oid AND typtype = 'b' "
+                                       "                       UNION ALL "
+       /* composite types containing any type selected so far */
+                                       "                       SELECT t.oid 
FROM pg_catalog.pg_type t, pg_catalog.pg_class c, pg_catalog.pg_attribute a, x "
+                                       "                       WHERE t.typtype 
= 'c' AND "
+                                       "                                 t.oid 
= c.reltype AND "
+                                       "                                 c.oid 
= a.attrelid AND "
+                                       "                                 NOT 
a.attisdropped AND "
+                                       "                                 
a.atttypid = x.oid "
+                                       "                       UNION ALL "
+       /* ranges containing any type selected so far */
+                                       "                       SELECT t.oid 
FROM pg_catalog.pg_type t, pg_catalog.pg_range r, x "
+                                       "                       WHERE t.typtype 
= 'r' AND r.rngtypid = t.oid AND r.rngsubtype = x.oid"
+                                       "       ) foo "
+                                       ") "
+       /* now look for stored columns of any such type */
+                                       "SELECT n.nspname, c.relname, a.attname 
"
+                                       "FROM   pg_catalog.pg_class c, "
+                                       "               pg_catalog.pg_namespace 
n, "
+                                       "               pg_catalog.pg_attribute 
a "
+                                       "WHERE  c.oid = a.attrelid AND "
+                                       "               NOT a.attisdropped AND "
+                                       "               a.atttypid IN (SELECT 
oid FROM oids) AND "
+                                       "               c.relkind IN ("
+                                       CppAsString2(RELKIND_RELATION) ", "
+                                       CppAsString2(RELKIND_MATVIEW) ", "
+                                       CppAsString2(RELKIND_INDEX) ") AND "
+                                       "               c.relnamespace = n.oid 
AND "
+       /* exclude possible orphaned temp tables */
+                                       "               n.nspname !~ 
'^pg_temp_' AND "
+                                       "               n.nspname !~ 
'^pg_toast_temp_' AND "
+       /* exclude system catalogs, too */
+                                       "               n.nspname NOT IN 
('pg_catalog', 'information_schema')",
+                                       check->base_query);
+}
+
+static void
+data_type_check_process(DbInfo *dbinfo, PGresult *res, void *arg)
+{
+       struct data_type_check_state *state = (struct data_type_check_state *) 
arg;
+       int                     ntups = PQntuples(res);
+
+       if (ntups)
+       {
+               char            output_path[MAXPGPATH];
+               int                     i_nspname;
+               int                     i_relname;
+               int                     i_attname;
+               FILE       *script = NULL;
+               bool            db_used = false;
+
+               snprintf(output_path, sizeof(output_path), "%s/%s",
+                                log_opts.basedir,
+                                state->check->report_filename);
+
+               /*
+                * Make sure we have a buffer to save reports to now that we 
found a
+                * first failing check.
+                */
+               if (*state->report == NULL)
+                       *state->report = createPQExpBuffer();
+
+               /*
+                * If this is the first time we see an error for the check in 
question
+                * then print a status message of the failure.
+                */
+               if (!state->result)
+               {
+                       pg_log(PG_REPORT, "    failed check: %s", 
_(state->check->status));
+                       appendPQExpBuffer(*state->report, "\n%s\n%s    %s\n",
+                                                         
_(state->check->report_text),
+                                                         _("A list of the 
problem columns is in the file:"),
+                                                         output_path);
+               }
+               state->result = true;
+
+               i_nspname = PQfnumber(res, "nspname");
+               i_relname = PQfnumber(res, "relname");
+               i_attname = PQfnumber(res, "attname");
+
+               for (int rowno = 0; rowno < ntups; rowno++)
+               {
+                       if (script == NULL && (script = fopen_priv(output_path, 
"a")) == NULL)
+                               pg_fatal("could not open file \"%s\": %m", 
output_path);
+
+                       if (!db_used)
+                       {
+                               fprintf(script, "In database: %s\n", 
dbinfo->db_name);
+                               db_used = true;
+                       }
+                       fprintf(script, "  %s.%s.%s\n",
+                                       PQgetvalue(res, rowno, i_nspname),
+                                       PQgetvalue(res, rowno, i_relname),
+                                       PQgetvalue(res, rowno, i_attname));
+               }
+
+               if (script)
+               {
+                       fclose(script);
+                       script = NULL;
+               }
+       }
+}
+
 /*
  * check_for_data_types_usage()
  *     Detect whether there are any stored columns depending on given type(s)
@@ -336,11 +463,12 @@ static DataTypesUsageChecks data_types_usage_checks[] =
 static void
 check_for_data_types_usage(ClusterInfo *cluster, DataTypesUsageChecks *checks)
 {
-       bool            found = false;
-       bool       *results;
-       PQExpBufferData report;
        DataTypesUsageChecks *tmp = checks;
        int                     n_data_types_usage_checks = 0;
+       AsyncTask  *task = async_task_create();
+       char      **queries = NULL;
+       struct data_type_check_state *states;
+       PQExpBuffer report = NULL;
 
        prep_status("Checking for data type usage");
 
@@ -352,176 +480,61 @@ check_for_data_types_usage(ClusterInfo *cluster, 
DataTypesUsageChecks *checks)
        }
 
        /* Prepare an array to store the results of checks in */
-       results = pg_malloc0(sizeof(bool) * n_data_types_usage_checks);
+       queries = pg_malloc0(sizeof(char *) * n_data_types_usage_checks);
+       states = pg_malloc0(sizeof(struct data_type_check_state) * 
n_data_types_usage_checks);
 
-       /*
-        * Connect to each database in the cluster and run all defined checks
-        * against that database before trying the next one.
-        */
-       for (int dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
+       for (int i = 0; i < n_data_types_usage_checks; i++)
        {
-               DbInfo     *active_db = &cluster->dbarr.dbs[dbnum];
-               PGconn     *conn = connectToServer(cluster, active_db->db_name);
+               DataTypesUsageChecks *check = &checks[i];
 
-               for (int checknum = 0; checknum < n_data_types_usage_checks; 
checknum++)
+               if (check->threshold_version == MANUAL_CHECK)
                {
-                       PGresult   *res;
-                       int                     ntups;
-                       int                     i_nspname;
-                       int                     i_relname;
-                       int                     i_attname;
-                       FILE       *script = NULL;
-                       bool            db_used = false;
-                       char            output_path[MAXPGPATH];
-                       DataTypesUsageChecks *cur_check = &checks[checknum];
-
-                       if (cur_check->threshold_version == MANUAL_CHECK)
-                       {
-                               Assert(cur_check->version_hook);
-
-                               /*
-                                * Make sure that the check applies to the 
current cluster
-                                * version and skip if not. If no check hook 
has been defined
-                                * we run the check for all versions.
-                                */
-                               if (!cur_check->version_hook(cluster))
-                                       continue;
-                       }
-                       else if (cur_check->threshold_version != ALL_VERSIONS)
-                       {
-                               if (GET_MAJOR_VERSION(cluster->major_version) > 
cur_check->threshold_version)
-                                       continue;
-                       }
-                       else
-                               Assert(cur_check->threshold_version == 
ALL_VERSIONS);
-
-                       snprintf(output_path, sizeof(output_path), "%s/%s",
-                                        log_opts.basedir,
-                                        cur_check->report_filename);
+                       Assert(check->version_hook);
 
                        /*
-                        * The type(s) of interest might be wrapped in a 
domain, array,
-                        * composite, or range, and these container types can 
be nested
-                        * (to varying extents depending on server version, but 
that's not
-                        * of concern here).  To handle all these cases we need 
a
-                        * recursive CTE.
+                        * Make sure that the check applies to the current 
cluster version
+                        * and skip it if not.
                         */
-                       res = executeQueryOrDie(conn,
-                                                                       "WITH 
RECURSIVE oids AS ( "
-                       /* start with the type(s) returned by base_query */
-                                                                       "       
%s "
-                                                                       "       
UNION ALL "
-                                                                       "       
SELECT * FROM ( "
-                       /* inner WITH because we can only reference the CTE 
once */
-                                                                       "       
        WITH x AS (SELECT oid FROM oids) "
-                       /* domains on any type selected so far */
-                                                                       "       
                SELECT t.oid FROM pg_catalog.pg_type t, x WHERE typbasetype = 
x.oid AND typtype = 'd' "
-                                                                       "       
                UNION ALL "
-                       /* arrays over any type selected so far */
-                                                                       "       
                SELECT t.oid FROM pg_catalog.pg_type t, x WHERE typelem = x.oid 
AND typtype = 'b' "
-                                                                       "       
                UNION ALL "
-                       /* composite types containing any type selected so far 
*/
-                                                                       "       
                SELECT t.oid FROM pg_catalog.pg_type t, pg_catalog.pg_class c, 
pg_catalog.pg_attribute a, x "
-                                                                       "       
                WHERE t.typtype = 'c' AND "
-                                                                       "       
                          t.oid = c.reltype AND "
-                                                                       "       
                          c.oid = a.attrelid AND "
-                                                                       "       
                          NOT a.attisdropped AND "
-                                                                       "       
                          a.atttypid = x.oid "
-                                                                       "       
                UNION ALL "
-                       /* ranges containing any type selected so far */
-                                                                       "       
                SELECT t.oid FROM pg_catalog.pg_type t, pg_catalog.pg_range r, 
x "
-                                                                       "       
                WHERE t.typtype = 'r' AND r.rngtypid = t.oid AND r.rngsubtype = 
x.oid"
-                                                                       "       
) foo "
-                                                                       ") "
-                       /* now look for stored columns of any such type */
-                                                                       "SELECT 
n.nspname, c.relname, a.attname "
-                                                                       "FROM   
pg_catalog.pg_class c, "
-                                                                       "       
        pg_catalog.pg_namespace n, "
-                                                                       "       
        pg_catalog.pg_attribute a "
-                                                                       "WHERE  
c.oid = a.attrelid AND "
-                                                                       "       
        NOT a.attisdropped AND "
-                                                                       "       
        a.atttypid IN (SELECT oid FROM oids) AND "
-                                                                       "       
        c.relkind IN ("
-                                                                       
CppAsString2(RELKIND_RELATION) ", "
-                                                                       
CppAsString2(RELKIND_MATVIEW) ", "
-                                                                       
CppAsString2(RELKIND_INDEX) ") AND "
-                                                                       "       
        c.relnamespace = n.oid AND "
-                       /* exclude possible orphaned temp tables */
-                                                                       "       
        n.nspname !~ '^pg_temp_' AND "
-                                                                       "       
        n.nspname !~ '^pg_toast_temp_' AND "
-                       /* exclude system catalogs, too */
-                                                                       "       
        n.nspname NOT IN ('pg_catalog', 'information_schema')",
-                                                                       
cur_check->base_query);
-
-                       ntups = PQntuples(res);
+                       if (!check->version_hook(cluster))
+                               continue;
+               }
+               else if (check->threshold_version != ALL_VERSIONS)
+               {
+                       if (GET_MAJOR_VERSION(cluster->major_version) > 
check->threshold_version)
+                               continue;
+               }
+               else
+                       Assert(check->threshold_version == ALL_VERSIONS);
 
-                       /*
-                        * The datatype was found, so extract the data and log 
to the
-                        * requested filename. We need to open the file for 
appending
-                        * since the check might have already found the type in 
another
-                        * database earlier in the loop.
-                        */
-                       if (ntups)
-                       {
-                               /*
-                                * Make sure we have a buffer to save reports 
to now that we
-                                * found a first failing check.
-                                */
-                               if (!found)
-                                       initPQExpBuffer(&report);
-                               found = true;
-
-                               /*
-                                * If this is the first time we see an error 
for the check in
-                                * question then print a status message of the 
failure.
-                                */
-                               if (!results[checknum])
-                               {
-                                       pg_log(PG_REPORT, "    failed check: 
%s", _(cur_check->status));
-                                       appendPQExpBuffer(&report, "\n%s\n%s    
%s\n",
-                                                                         
_(cur_check->report_text),
-                                                                         _("A 
list of the problem columns is in the file:"),
-                                                                         
output_path);
-                               }
-                               results[checknum] = true;
-
-                               i_nspname = PQfnumber(res, "nspname");
-                               i_relname = PQfnumber(res, "relname");
-                               i_attname = PQfnumber(res, "attname");
-
-                               for (int rowno = 0; rowno < ntups; rowno++)
-                               {
-                                       if (script == NULL && (script = 
fopen_priv(output_path, "a")) == NULL)
-                                               pg_fatal("could not open file 
\"%s\": %m", output_path);
-
-                                       if (!db_used)
-                                       {
-                                               fprintf(script, "In database: 
%s\n", active_db->db_name);
-                                               db_used = true;
-                                       }
-                                       fprintf(script, "  %s.%s.%s\n",
-                                                       PQgetvalue(res, rowno, 
i_nspname),
-                                                       PQgetvalue(res, rowno, 
i_relname),
-                                                       PQgetvalue(res, rowno, 
i_attname));
-                               }
-
-                               if (script)
-                               {
-                                       fclose(script);
-                                       script = NULL;
-                               }
-                       }
+               queries[i] = data_type_check_query(i);
 
-                       PQclear(res);
-               }
+               states[i].check = &data_types_usage_checks[i];
+               states[i].report = &report;
 
-               PQfinish(conn);
+               async_task_add_step(task, queries[i],
+                                                       
data_type_check_process, true, &states[i]);
        }
 
-       if (found)
-               pg_fatal("Data type checks failed: %s", report.data);
+       /*
+        * Connect to each database in the cluster and run all defined checks
+        * against that database before trying the next one.
+        */
+       async_task_run(task, cluster);
+       async_task_free(task);
 
-       pg_free(results);
+       if (report)
+       {
+               pg_fatal("Data type checks failed: %s", report->data);
+               destroyPQExpBuffer(report);
+       }
+
+       for (int i = 0; i < n_data_types_usage_checks; i++)
+       {
+               if (queries[i])
+                       pg_free(queries[i]);
+       }
+       pg_free(queries);
+       pg_free(states);
 
        check_ok();
 }
-- 
2.39.3 (Apple Git-146)

>From 8d0580f5b64639a8b691ee33466294518502f8f9 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Mon, 8 Jul 2024 21:00:20 -0500
Subject: [PATCH v9 07/11] parallelize isn and int8 passing mismatch check in
 pg_upgrade

---
 src/bin/pg_upgrade/check.c | 80 +++++++++++++++++++-------------------
 1 file changed, 39 insertions(+), 41 deletions(-)

diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index 150c101d1c..5642404a2f 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -1207,6 +1207,34 @@ check_for_prepared_transactions(ClusterInfo *cluster)
        check_ok();
 }
 
+static void
+isn_and_int8_passing_mismatch_process(DbInfo *dbinfo, PGresult *res, void *arg)
+{
+       bool            db_used = false;
+       int                     ntups = PQntuples(res);
+       int                     i_nspname = PQfnumber(res, "nspname");
+       int                     i_proname = PQfnumber(res, "proname");
+       FILE      **script = (FILE **) arg;
+       char            output_path[MAXPGPATH];
+
+       snprintf(output_path, sizeof(output_path), "%s/%s",
+                        log_opts.basedir,
+                        "contrib_isn_and_int8_pass_by_value.txt");
+
+       for (int rowno = 0; rowno < ntups; rowno++)
+       {
+               if (*script == NULL && (*script = fopen_priv(output_path, "w")) 
== NULL)
+                       pg_fatal("could not open file \"%s\": %m", output_path);
+               if (!db_used)
+               {
+                       fprintf(*script, "In database: %s\n", dbinfo->db_name);
+                       db_used = true;
+               }
+               fprintf(*script, "  %s.%s\n",
+                               PQgetvalue(res, rowno, i_nspname),
+                               PQgetvalue(res, rowno, i_proname));
+       }
+}
 
 /*
  *     check_for_isn_and_int8_passing_mismatch()
@@ -1218,9 +1246,14 @@ check_for_prepared_transactions(ClusterInfo *cluster)
 static void
 check_for_isn_and_int8_passing_mismatch(ClusterInfo *cluster)
 {
-       int                     dbnum;
        FILE       *script = NULL;
+       AsyncTask  *task;
        char            output_path[MAXPGPATH];
+       const char *query = "SELECT n.nspname, p.proname "
+               "FROM   pg_catalog.pg_proc p, "
+               "       pg_catalog.pg_namespace n "
+               "WHERE  p.pronamespace = n.oid AND "
+               "       p.probin = '$libdir/isn'";
 
        prep_status("Checking for contrib/isn with bigint-passing mismatch");
 
@@ -1236,46 +1269,11 @@ check_for_isn_and_int8_passing_mismatch(ClusterInfo 
*cluster)
                         log_opts.basedir,
                         "contrib_isn_and_int8_pass_by_value.txt");
 
-       for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
-       {
-               PGresult   *res;
-               bool            db_used = false;
-               int                     ntups;
-               int                     rowno;
-               int                     i_nspname,
-                                       i_proname;
-               DbInfo     *active_db = &cluster->dbarr.dbs[dbnum];
-               PGconn     *conn = connectToServer(cluster, active_db->db_name);
-
-               /* Find any functions coming from contrib/isn */
-               res = executeQueryOrDie(conn,
-                                                               "SELECT 
n.nspname, p.proname "
-                                                               "FROM   
pg_catalog.pg_proc p, "
-                                                               "               
pg_catalog.pg_namespace n "
-                                                               "WHERE  
p.pronamespace = n.oid AND "
-                                                               "               
p.probin = '$libdir/isn'");
-
-               ntups = PQntuples(res);
-               i_nspname = PQfnumber(res, "nspname");
-               i_proname = PQfnumber(res, "proname");
-               for (rowno = 0; rowno < ntups; rowno++)
-               {
-                       if (script == NULL && (script = fopen_priv(output_path, 
"w")) == NULL)
-                               pg_fatal("could not open file \"%s\": %m", 
output_path);
-                       if (!db_used)
-                       {
-                               fprintf(script, "In database: %s\n", 
active_db->db_name);
-                               db_used = true;
-                       }
-                       fprintf(script, "  %s.%s\n",
-                                       PQgetvalue(res, rowno, i_nspname),
-                                       PQgetvalue(res, rowno, i_proname));
-               }
-
-               PQclear(res);
-
-               PQfinish(conn);
-       }
+       task = async_task_create();
+       async_task_add_step(task, query,
+                                               
isn_and_int8_passing_mismatch_process, true, &script);
+       async_task_run(task, cluster);
+       async_task_free(task);
 
        if (script)
        {
-- 
2.39.3 (Apple Git-146)

>From 095269b37b48ff76f3fb0065a99a7f78be0f24a3 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Mon, 8 Jul 2024 21:12:49 -0500
Subject: [PATCH v9 08/11] parallelize user defined postfix ops check in
 pg_upgrade

---
 src/bin/pg_upgrade/check.c | 131 +++++++++++++++++++------------------
 1 file changed, 66 insertions(+), 65 deletions(-)

diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index 5642404a2f..25cd10c000 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -1291,15 +1291,76 @@ check_for_isn_and_int8_passing_mismatch(ClusterInfo 
*cluster)
                check_ok();
 }
 
+static void
+user_defined_postfix_ops_process(DbInfo *dbinfo, PGresult *res, void *arg)
+{
+       FILE      **script = (FILE **) arg;
+       char            output_path[MAXPGPATH];
+       int                     ntups = PQntuples(res);
+       bool            db_used = false;
+       int                     i_oproid = PQfnumber(res, "oproid");
+       int                     i_oprnsp = PQfnumber(res, "oprnsp");
+       int                     i_oprname = PQfnumber(res, "oprname");
+       int                     i_typnsp = PQfnumber(res, "typnsp");
+       int                     i_typname = PQfnumber(res, "typname");
+
+       if (!ntups)
+               return;
+
+       snprintf(output_path, sizeof(output_path), "%s/%s",
+                        log_opts.basedir,
+                        "postfix_ops.txt");
+
+       for (int rowno = 0; rowno < ntups; rowno++)
+       {
+               if (*script == NULL &&
+                       (*script = fopen_priv(output_path, "w")) == NULL)
+                       pg_fatal("could not open file \"%s\": %m", output_path);
+               if (!db_used)
+               {
+                       fprintf(*script, "In database: %s\n", dbinfo->db_name);
+                       db_used = true;
+               }
+               fprintf(*script, "  (oid=%s) %s.%s (%s.%s, NONE)\n",
+                               PQgetvalue(res, rowno, i_oproid),
+                               PQgetvalue(res, rowno, i_oprnsp),
+                               PQgetvalue(res, rowno, i_oprname),
+                               PQgetvalue(res, rowno, i_typnsp),
+                               PQgetvalue(res, rowno, i_typname));
+       }
+}
+
 /*
  * Verify that no user defined postfix operators exist.
  */
 static void
 check_for_user_defined_postfix_ops(ClusterInfo *cluster)
 {
-       int                     dbnum;
        FILE       *script = NULL;
        char            output_path[MAXPGPATH];
+       AsyncTask  *task = async_task_create();
+       const char *query;
+
+       /*
+        * The query below hardcodes FirstNormalObjectId as 16384 rather than
+        * interpolating that C #define into the query because, if that #define 
is
+        * ever changed, the cutoff we want to use is the value used by
+        * pre-version 14 servers, not that of some future version.
+        */
+       query = "SELECT o.oid AS oproid, "
+               "       n.nspname AS oprnsp, "
+               "       o.oprname, "
+               "       tn.nspname AS typnsp, "
+               "       t.typname "
+               "FROM pg_catalog.pg_operator o, "
+               "     pg_catalog.pg_namespace n, "
+               "     pg_catalog.pg_type t, "
+               "     pg_catalog.pg_namespace tn "
+               "WHERE o.oprnamespace = n.oid AND "
+               "      o.oprleft = t.oid AND "
+               "      t.typnamespace = tn.oid AND "
+               "      o.oprright = 0 AND "
+               "      o.oid >= 16384";
 
        prep_status("Checking for user-defined postfix operators");
 
@@ -1307,70 +1368,10 @@ check_for_user_defined_postfix_ops(ClusterInfo *cluster)
                         log_opts.basedir,
                         "postfix_ops.txt");
 
-       /* Find any user defined postfix operators */
-       for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
-       {
-               PGresult   *res;
-               bool            db_used = false;
-               int                     ntups;
-               int                     rowno;
-               int                     i_oproid,
-                                       i_oprnsp,
-                                       i_oprname,
-                                       i_typnsp,
-                                       i_typname;
-               DbInfo     *active_db = &cluster->dbarr.dbs[dbnum];
-               PGconn     *conn = connectToServer(cluster, active_db->db_name);
-
-               /*
-                * The query below hardcodes FirstNormalObjectId as 16384 
rather than
-                * interpolating that C #define into the query because, if that
-                * #define is ever changed, the cutoff we want to use is the 
value
-                * used by pre-version 14 servers, not that of some future 
version.
-                */
-               res = executeQueryOrDie(conn,
-                                                               "SELECT o.oid 
AS oproid, "
-                                                               "       
n.nspname AS oprnsp, "
-                                                               "       
o.oprname, "
-                                                               "       
tn.nspname AS typnsp, "
-                                                               "       
t.typname "
-                                                               "FROM 
pg_catalog.pg_operator o, "
-                                                               "     
pg_catalog.pg_namespace n, "
-                                                               "     
pg_catalog.pg_type t, "
-                                                               "     
pg_catalog.pg_namespace tn "
-                                                               "WHERE 
o.oprnamespace = n.oid AND "
-                                                               "      
o.oprleft = t.oid AND "
-                                                               "      
t.typnamespace = tn.oid AND "
-                                                               "      
o.oprright = 0 AND "
-                                                               "      o.oid >= 
16384");
-               ntups = PQntuples(res);
-               i_oproid = PQfnumber(res, "oproid");
-               i_oprnsp = PQfnumber(res, "oprnsp");
-               i_oprname = PQfnumber(res, "oprname");
-               i_typnsp = PQfnumber(res, "typnsp");
-               i_typname = PQfnumber(res, "typname");
-               for (rowno = 0; rowno < ntups; rowno++)
-               {
-                       if (script == NULL &&
-                               (script = fopen_priv(output_path, "w")) == NULL)
-                               pg_fatal("could not open file \"%s\": %m", 
output_path);
-                       if (!db_used)
-                       {
-                               fprintf(script, "In database: %s\n", 
active_db->db_name);
-                               db_used = true;
-                       }
-                       fprintf(script, "  (oid=%s) %s.%s (%s.%s, NONE)\n",
-                                       PQgetvalue(res, rowno, i_oproid),
-                                       PQgetvalue(res, rowno, i_oprnsp),
-                                       PQgetvalue(res, rowno, i_oprname),
-                                       PQgetvalue(res, rowno, i_typnsp),
-                                       PQgetvalue(res, rowno, i_typname));
-               }
-
-               PQclear(res);
-
-               PQfinish(conn);
-       }
+       async_task_add_step(task, query,
+                                               
user_defined_postfix_ops_process, true, &script);
+       async_task_run(task, cluster);
+       async_task_free(task);
 
        if (script)
        {
-- 
2.39.3 (Apple Git-146)

>From 3c3b6bd7463d56de507da67fda62d241bb0a3a1a Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Mon, 8 Jul 2024 21:30:19 -0500
Subject: [PATCH v9 09/11] parallelize incompatible polymorphics check in
 pg_upgrade

---
 src/bin/pg_upgrade/check.c       | 155 ++++++++++++++++---------------
 src/tools/pgindent/typedefs.list |   1 +
 2 files changed, 81 insertions(+), 75 deletions(-)

diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index 25cd10c000..af7d093581 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -1387,6 +1387,38 @@ check_for_user_defined_postfix_ops(ClusterInfo *cluster)
                check_ok();
 }
 
+typedef struct incompat_polymorphics_state
+{
+       FILE       *script;
+       char            output_path[MAXPGPATH];
+} incompat_polymorphics_state;
+
+static void
+incompat_polymorphics_process(DbInfo *dbinfo, PGresult *res, void *arg)
+{
+       incompat_polymorphics_state *state = (incompat_polymorphics_state *) 
arg;
+       bool            db_used = false;
+       int                     ntups = PQntuples(res);
+       int                     i_objkind = PQfnumber(res, "objkind");
+       int                     i_objname = PQfnumber(res, "objname");
+
+       for (int rowno = 0; rowno < ntups; rowno++)
+       {
+               if (state->script == NULL &&
+                       (state->script = fopen_priv(state->output_path, "w")) 
== NULL)
+                       pg_fatal("could not open file \"%s\": %m", 
state->output_path);
+               if (!db_used)
+               {
+                       fprintf(state->script, "In database: %s\n", 
dbinfo->db_name);
+                       db_used = true;
+               }
+
+               fprintf(state->script, "  %s: %s\n",
+                               PQgetvalue(res, rowno, i_objkind),
+                               PQgetvalue(res, rowno, i_objname));
+       }
+}
+
 /*
  *     check_for_incompatible_polymorphics()
  *
@@ -1396,14 +1428,15 @@ check_for_user_defined_postfix_ops(ClusterInfo *cluster)
 static void
 check_for_incompatible_polymorphics(ClusterInfo *cluster)
 {
-       PGresult   *res;
-       FILE       *script = NULL;
-       char            output_path[MAXPGPATH];
        PQExpBufferData old_polymorphics;
+       AsyncTask  *task = async_task_create();
+       incompat_polymorphics_state state;
+       char       *query;
 
        prep_status("Checking for incompatible polymorphic functions");
 
-       snprintf(output_path, sizeof(output_path), "%s/%s",
+       state.script = NULL;
+       snprintf(state.output_path, sizeof(state.output_path), "%s/%s",
                         log_opts.basedir,
                         "incompatible_polymorphics.txt");
 
@@ -1427,80 +1460,51 @@ check_for_incompatible_polymorphics(ClusterInfo 
*cluster)
                                                         ", 
'array_positions(anyarray,anyelement)'"
                                                         ", 
'width_bucket(anyelement,anyarray)'");
 
-       for (int dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
-       {
-               bool            db_used = false;
-               DbInfo     *active_db = &cluster->dbarr.dbs[dbnum];
-               PGconn     *conn = connectToServer(cluster, active_db->db_name);
-               int                     ntups;
-               int                     i_objkind,
-                                       i_objname;
-
-               /*
-                * The query below hardcodes FirstNormalObjectId as 16384 
rather than
-                * interpolating that C #define into the query because, if that
-                * #define is ever changed, the cutoff we want to use is the 
value
-                * used by pre-version 14 servers, not that of some future 
version.
-                */
-               res = executeQueryOrDie(conn,
-               /* Aggregate transition functions */
-                                                               "SELECT 
'aggregate' AS objkind, p.oid::regprocedure::text AS objname "
-                                                               "FROM pg_proc 
AS p "
-                                                               "JOIN 
pg_aggregate AS a ON a.aggfnoid=p.oid "
-                                                               "JOIN pg_proc 
AS transfn ON transfn.oid=a.aggtransfn "
-                                                               "WHERE p.oid >= 
16384 "
-                                                               "AND 
a.aggtransfn = ANY(ARRAY[%s]::regprocedure[]) "
-                                                               "AND 
a.aggtranstype = ANY(ARRAY['anyarray', 'anyelement']::regtype[]) "
-
-               /* Aggregate final functions */
-                                                               "UNION ALL "
-                                                               "SELECT 
'aggregate' AS objkind, p.oid::regprocedure::text AS objname "
-                                                               "FROM pg_proc 
AS p "
-                                                               "JOIN 
pg_aggregate AS a ON a.aggfnoid=p.oid "
-                                                               "JOIN pg_proc 
AS finalfn ON finalfn.oid=a.aggfinalfn "
-                                                               "WHERE p.oid >= 
16384 "
-                                                               "AND 
a.aggfinalfn = ANY(ARRAY[%s]::regprocedure[]) "
-                                                               "AND 
a.aggtranstype = ANY(ARRAY['anyarray', 'anyelement']::regtype[]) "
-
-               /* Operators */
-                                                               "UNION ALL "
-                                                               "SELECT 
'operator' AS objkind, op.oid::regoperator::text AS objname "
-                                                               "FROM 
pg_operator AS op "
-                                                               "WHERE op.oid 
>= 16384 "
-                                                               "AND oprcode = 
ANY(ARRAY[%s]::regprocedure[]) "
-                                                               "AND oprleft = 
ANY(ARRAY['anyarray', 'anyelement']::regtype[]);",
-                                                               
old_polymorphics.data,
-                                                               
old_polymorphics.data,
-                                                               
old_polymorphics.data);
-
-               ntups = PQntuples(res);
-
-               i_objkind = PQfnumber(res, "objkind");
-               i_objname = PQfnumber(res, "objname");
-
-               for (int rowno = 0; rowno < ntups; rowno++)
-               {
-                       if (script == NULL &&
-                               (script = fopen_priv(output_path, "w")) == NULL)
-                               pg_fatal("could not open file \"%s\": %m", 
output_path);
-                       if (!db_used)
-                       {
-                               fprintf(script, "In database: %s\n", 
active_db->db_name);
-                               db_used = true;
-                       }
+       /*
+        * The query below hardcodes FirstNormalObjectId as 16384 rather than
+        * interpolating that C #define into the query because, if that #define 
is
+        * ever changed, the cutoff we want to use is the value used by
+        * pre-version 14 servers, not that of some future version.
+        */
 
-                       fprintf(script, "  %s: %s\n",
-                                       PQgetvalue(res, rowno, i_objkind),
-                                       PQgetvalue(res, rowno, i_objname));
-               }
+       /* Aggregate transition functions */
+       query = psprintf("SELECT 'aggregate' AS objkind, 
p.oid::regprocedure::text AS objname "
+                                        "FROM pg_proc AS p "
+                                        "JOIN pg_aggregate AS a ON 
a.aggfnoid=p.oid "
+                                        "JOIN pg_proc AS transfn ON 
transfn.oid=a.aggtransfn "
+                                        "WHERE p.oid >= 16384 "
+                                        "AND a.aggtransfn = 
ANY(ARRAY[%s]::regprocedure[]) "
+                                        "AND a.aggtranstype = 
ANY(ARRAY['anyarray', 'anyelement']::regtype[]) "
+
+       /* Aggregate final functions */
+                                        "UNION ALL "
+                                        "SELECT 'aggregate' AS objkind, 
p.oid::regprocedure::text AS objname "
+                                        "FROM pg_proc AS p "
+                                        "JOIN pg_aggregate AS a ON 
a.aggfnoid=p.oid "
+                                        "JOIN pg_proc AS finalfn ON 
finalfn.oid=a.aggfinalfn "
+                                        "WHERE p.oid >= 16384 "
+                                        "AND a.aggfinalfn = 
ANY(ARRAY[%s]::regprocedure[]) "
+                                        "AND a.aggtranstype = 
ANY(ARRAY['anyarray', 'anyelement']::regtype[]) "
+
+       /* Operators */
+                                        "UNION ALL "
+                                        "SELECT 'operator' AS objkind, 
op.oid::regoperator::text AS objname "
+                                        "FROM pg_operator AS op "
+                                        "WHERE op.oid >= 16384 "
+                                        "AND oprcode = 
ANY(ARRAY[%s]::regprocedure[]) "
+                                        "AND oprleft = ANY(ARRAY['anyarray', 
'anyelement']::regtype[]);",
+                                        old_polymorphics.data,
+                                        old_polymorphics.data,
+                                        old_polymorphics.data);
 
-               PQclear(res);
-               PQfinish(conn);
-       }
+       async_task_add_step(task, query,
+                                               incompat_polymorphics_process, 
true, &state);
+       async_task_run(task, cluster);
+       async_task_free(task);
 
-       if (script)
+       if (state.script)
        {
-               fclose(script);
+               fclose(state.script);
                pg_log(PG_REPORT, "fatal");
                pg_fatal("Your installation contains user-defined objects that 
refer to internal\n"
                                 "polymorphic functions with arguments of type 
\"anyarray\" or \"anyelement\".\n"
@@ -1508,12 +1512,13 @@ check_for_incompatible_polymorphics(ClusterInfo 
*cluster)
                                 "afterwards, changing them to refer to the new 
corresponding functions with\n"
                                 "arguments of type \"anycompatiblearray\" and 
\"anycompatible\".\n"
                                 "A list of the problematic objects is in the 
file:\n"
-                                "    %s", output_path);
+                                "    %s", state.output_path);
        }
        else
                check_ok();
 
        termPQExpBuffer(&old_polymorphics);
+       pg_free(query);
 }
 
 /*
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index ec8106329d..03be80931e 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -3545,6 +3545,7 @@ hstoreUpgrade_t
 hyperLogLogState
 ifState
 import_error_callback_arg
+incompat_polymorphics_state
 indexed_tlist
 inet
 inetKEY
-- 
2.39.3 (Apple Git-146)

>From 3f0bc8b9fdb1b7f664fb3ae546a4664b271378ce Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Mon, 8 Jul 2024 21:42:22 -0500
Subject: [PATCH v9 10/11] parallelize tables with oids check in pg_upgrade

---
 src/bin/pg_upgrade/check.c | 85 +++++++++++++++++++-------------------
 1 file changed, 43 insertions(+), 42 deletions(-)

diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index af7d093581..4156257843 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -1521,15 +1521,53 @@ check_for_incompatible_polymorphics(ClusterInfo 
*cluster)
        pg_free(query);
 }
 
+static void
+with_oids_process(DbInfo *dbinfo, PGresult *res, void *arg)
+{
+       bool            db_used = false;
+       int                     ntups = PQntuples(res);
+       char            output_path[MAXPGPATH];
+       int                     i_nspname = PQfnumber(res, "nspname");
+       int                     i_relname = PQfnumber(res, "relname");
+       FILE      **script = (FILE **) arg;
+
+       if (!ntups)
+               return;
+
+       snprintf(output_path, sizeof(output_path), "%s/%s",
+                        log_opts.basedir,
+                        "tables_with_oids.txt");
+
+       for (int rowno = 0; rowno < ntups; rowno++)
+       {
+               if (*script == NULL && (*script = fopen_priv(output_path, "w")) 
== NULL)
+                       pg_fatal("could not open file \"%s\": %m", output_path);
+               if (!db_used)
+               {
+                       fprintf(*script, "In database: %s\n", dbinfo->db_name);
+                       db_used = true;
+               }
+               fprintf(*script, "  %s.%s\n",
+                               PQgetvalue(res, rowno, i_nspname),
+                               PQgetvalue(res, rowno, i_relname));
+       }
+}
+
 /*
  * Verify that no tables are declared WITH OIDS.
  */
 static void
 check_for_tables_with_oids(ClusterInfo *cluster)
 {
-       int                     dbnum;
        FILE       *script = NULL;
        char            output_path[MAXPGPATH];
+       AsyncTask  *task = async_task_create();
+       const char *query = "SELECT n.nspname, c.relname "
+               "FROM   pg_catalog.pg_class c, "
+               "       pg_catalog.pg_namespace n "
+               "WHERE  c.relnamespace = n.oid AND "
+               "       c.relhasoids AND"
+               "       n.nspname NOT IN ('pg_catalog')";
 
        prep_status("Checking for tables WITH OIDS");
 
@@ -1537,47 +1575,10 @@ check_for_tables_with_oids(ClusterInfo *cluster)
                         log_opts.basedir,
                         "tables_with_oids.txt");
 
-       /* Find any tables declared WITH OIDS */
-       for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
-       {
-               PGresult   *res;
-               bool            db_used = false;
-               int                     ntups;
-               int                     rowno;
-               int                     i_nspname,
-                                       i_relname;
-               DbInfo     *active_db = &cluster->dbarr.dbs[dbnum];
-               PGconn     *conn = connectToServer(cluster, active_db->db_name);
-
-               res = executeQueryOrDie(conn,
-                                                               "SELECT 
n.nspname, c.relname "
-                                                               "FROM   
pg_catalog.pg_class c, "
-                                                               "               
pg_catalog.pg_namespace n "
-                                                               "WHERE  
c.relnamespace = n.oid AND "
-                                                               "               
c.relhasoids AND"
-                                                               "       
n.nspname NOT IN ('pg_catalog')");
-
-               ntups = PQntuples(res);
-               i_nspname = PQfnumber(res, "nspname");
-               i_relname = PQfnumber(res, "relname");
-               for (rowno = 0; rowno < ntups; rowno++)
-               {
-                       if (script == NULL && (script = fopen_priv(output_path, 
"w")) == NULL)
-                               pg_fatal("could not open file \"%s\": %m", 
output_path);
-                       if (!db_used)
-                       {
-                               fprintf(script, "In database: %s\n", 
active_db->db_name);
-                               db_used = true;
-                       }
-                       fprintf(script, "  %s.%s\n",
-                                       PQgetvalue(res, rowno, i_nspname),
-                                       PQgetvalue(res, rowno, i_relname));
-               }
-
-               PQclear(res);
-
-               PQfinish(conn);
-       }
+       async_task_add_step(task, query,
+                                               with_oids_process, true, 
&script);
+       async_task_run(task, cluster);
+       async_task_free(task);
 
        if (script)
        {
-- 
2.39.3 (Apple Git-146)

>From 7a665465afe5fbe027df70467d518ddc235c33b7 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Mon, 8 Jul 2024 21:52:13 -0500
Subject: [PATCH v9 11/11] parallelize user defined encoding conversions check
 in pg_upgrade

---
 src/bin/pg_upgrade/check.c | 102 +++++++++++++++++++------------------
 1 file changed, 53 insertions(+), 49 deletions(-)

diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index 4156257843..6910d18713 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -1652,15 +1652,51 @@ check_for_pg_role_prefix(ClusterInfo *cluster)
                check_ok();
 }
 
+static void
+user_defined_encoding_conversions_process(DbInfo *dbinfo, PGresult *res, void 
*arg)
+{
+       FILE      **script = (FILE **) arg;
+       bool            db_used = false;
+       int                     ntups = PQntuples(res);
+       char            output_path[MAXPGPATH];
+       int                     i_conoid = PQfnumber(res, "conoid");
+       int                     i_conname = PQfnumber(res, "conname");
+       int                     i_nspname = PQfnumber(res, "nspname");
+
+       if (!ntups)
+               return;
+
+       snprintf(output_path, sizeof(output_path), "%s/%s",
+                        log_opts.basedir,
+                        "encoding_conversions.txt");
+
+       for (int rowno = 0; rowno < ntups; rowno++)
+       {
+               if (*script == NULL &&
+                       (*script = fopen_priv(output_path, "w")) == NULL)
+                       pg_fatal("could not open file \"%s\": %m", output_path);
+               if (!db_used)
+               {
+                       fprintf(*script, "In database: %s\n", dbinfo->db_name);
+                       db_used = true;
+               }
+               fprintf(*script, "  (oid=%s) %s.%s\n",
+                               PQgetvalue(res, rowno, i_conoid),
+                               PQgetvalue(res, rowno, i_nspname),
+                               PQgetvalue(res, rowno, i_conname));
+       }
+}
+
 /*
  * Verify that no user-defined encoding conversions exist.
  */
 static void
 check_for_user_defined_encoding_conversions(ClusterInfo *cluster)
 {
-       int                     dbnum;
        FILE       *script = NULL;
        char            output_path[MAXPGPATH];
+       AsyncTask  *task = async_task_create();
+       const char *query;
 
        prep_status("Checking for user-defined encoding conversions");
 
@@ -1668,55 +1704,23 @@ check_for_user_defined_encoding_conversions(ClusterInfo 
*cluster)
                         log_opts.basedir,
                         "encoding_conversions.txt");
 
-       /* Find any user defined encoding conversions */
-       for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
-       {
-               PGresult   *res;
-               bool            db_used = false;
-               int                     ntups;
-               int                     rowno;
-               int                     i_conoid,
-                                       i_conname,
-                                       i_nspname;
-               DbInfo     *active_db = &cluster->dbarr.dbs[dbnum];
-               PGconn     *conn = connectToServer(cluster, active_db->db_name);
-
-               /*
-                * The query below hardcodes FirstNormalObjectId as 16384 
rather than
-                * interpolating that C #define into the query because, if that
-                * #define is ever changed, the cutoff we want to use is the 
value
-                * used by pre-version 14 servers, not that of some future 
version.
-                */
-               res = executeQueryOrDie(conn,
-                                                               "SELECT c.oid 
as conoid, c.conname, n.nspname "
-                                                               "FROM 
pg_catalog.pg_conversion c, "
-                                                               "     
pg_catalog.pg_namespace n "
-                                                               "WHERE 
c.connamespace = n.oid AND "
-                                                               "      c.oid >= 
16384");
-               ntups = PQntuples(res);
-               i_conoid = PQfnumber(res, "conoid");
-               i_conname = PQfnumber(res, "conname");
-               i_nspname = PQfnumber(res, "nspname");
-               for (rowno = 0; rowno < ntups; rowno++)
-               {
-                       if (script == NULL &&
-                               (script = fopen_priv(output_path, "w")) == NULL)
-                               pg_fatal("could not open file \"%s\": %m", 
output_path);
-                       if (!db_used)
-                       {
-                               fprintf(script, "In database: %s\n", 
active_db->db_name);
-                               db_used = true;
-                       }
-                       fprintf(script, "  (oid=%s) %s.%s\n",
-                                       PQgetvalue(res, rowno, i_conoid),
-                                       PQgetvalue(res, rowno, i_nspname),
-                                       PQgetvalue(res, rowno, i_conname));
-               }
-
-               PQclear(res);
+       /*
+        * The query below hardcodes FirstNormalObjectId as 16384 rather than
+        * interpolating that C #define into the query because, if that #define 
is
+        * ever changed, the cutoff we want to use is the value used by
+        * pre-version 14 servers, not that of some future version.
+        */
+       query = "SELECT c.oid as conoid, c.conname, n.nspname "
+               "FROM pg_catalog.pg_conversion c, "
+               "     pg_catalog.pg_namespace n "
+               "WHERE c.connamespace = n.oid AND "
+               "      c.oid >= 16384";
 
-               PQfinish(conn);
-       }
+       async_task_add_step(task, query,
+                                               
user_defined_encoding_conversions_process, true,
+                                               &script);
+       async_task_run(task, cluster);
+       async_task_free(task);
 
        if (script)
        {
-- 
2.39.3 (Apple Git-146)

Reply via email to