I spent some time cleaning up names, comments, etc.  Barring additional
feedback, I'm planning to commit this stuff in the September commitfest so
that it has plenty of time to bake in the buildfarm.

-- 
nathan
>From c38137cd2cbc45c348e8e144f864a62c08bd7b7f Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Wed, 28 Aug 2024 10:45:59 -0500
Subject: [PATCH v10 01/11] Introduce framework for parallelizing various
 pg_upgrade tasks.

A number of pg_upgrade steps require connecting to every database
in the cluster and running the same query in each one.  When there
are many databases, these steps are particularly time-consuming,
especially since these steps are performed sequentially in a single
process.

This commit introduces a new framework that makes it easy to
parallelize most of these once-in-each-database tasks.
Specifically, it manages a simple state machine of slots and uses
libpq's asynchronous APIs to establish the connections and run the
queries.  The --jobs option is used to determine the number of
slots to use.  To use this new task framework, callers simply need
to provide the query and a callback function to process its
results, and the framework takes care of the rest.  A more complete
description is provided at the top of the new task.c file.

None of the eligible once-in-each-database tasks are converted to
use this new framework in this commit.  That will be done via
several follow-up commits.

Reviewed-by: Jeff Davis, Robert Haas, Daniel Gustafsson, Ilya Gladyshev, Corey 
Huinker
Discussion: https://postgr.es/m/20240516211638.GA1688936%40nathanxps13
---
 src/bin/pg_upgrade/Makefile      |   1 +
 src/bin/pg_upgrade/meson.build   |   1 +
 src/bin/pg_upgrade/pg_upgrade.h  |  21 ++
 src/bin/pg_upgrade/task.c        | 426 +++++++++++++++++++++++++++++++
 src/tools/pgindent/typedefs.list |   5 +
 5 files changed, 454 insertions(+)
 create mode 100644 src/bin/pg_upgrade/task.c

diff --git a/src/bin/pg_upgrade/Makefile b/src/bin/pg_upgrade/Makefile
index bde91e2beb..f83d2b5d30 100644
--- a/src/bin/pg_upgrade/Makefile
+++ b/src/bin/pg_upgrade/Makefile
@@ -25,6 +25,7 @@ OBJS = \
        relfilenumber.o \
        server.o \
        tablespace.o \
+       task.o \
        util.o \
        version.o
 
diff --git a/src/bin/pg_upgrade/meson.build b/src/bin/pg_upgrade/meson.build
index 9825fa3305..3d88419674 100644
--- a/src/bin/pg_upgrade/meson.build
+++ b/src/bin/pg_upgrade/meson.build
@@ -14,6 +14,7 @@ pg_upgrade_sources = files(
   'relfilenumber.c',
   'server.c',
   'tablespace.c',
+  'task.c',
   'util.c',
   'version.c',
 )
diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h
index cdb6e2b759..53f693c2d4 100644
--- a/src/bin/pg_upgrade/pg_upgrade.h
+++ b/src/bin/pg_upgrade/pg_upgrade.h
@@ -494,3 +494,24 @@ void               parallel_transfer_all_new_dbs(DbInfoArr 
*old_db_arr, DbInfoArr *new_db_arr
                                                                                
  char *old_pgdata, char *new_pgdata,
                                                                                
  char *old_tablespace);
 bool           reap_child(bool wait_for_child);
+
+/* task.c */
+
+typedef void (*UpgradeTaskProcessCB) (DbInfo *dbinfo, PGresult *res, void 
*arg);
+
+/* struct definition is private to task.c */
+typedef struct UpgradeTask UpgradeTask;
+
+UpgradeTask *upgrade_task_create(void);
+void           upgrade_task_add_step(UpgradeTask *task, const char *query,
+                                                                 
UpgradeTaskProcessCB process_cb, bool free_result,
+                                                                 void *arg);
+void           upgrade_task_run(const UpgradeTask *task, const ClusterInfo 
*cluster);
+void           upgrade_task_free(UpgradeTask *task);
+
+/* convenient type for common private data needed by several tasks */
+typedef struct
+{
+       FILE       *file;
+       char            path[MAXPGPATH];
+} UpgradeTaskReport;
diff --git a/src/bin/pg_upgrade/task.c b/src/bin/pg_upgrade/task.c
new file mode 100644
index 0000000000..b5e2455ae2
--- /dev/null
+++ b/src/bin/pg_upgrade/task.c
@@ -0,0 +1,426 @@
+/*
+ * task.c
+ *             framework for parallelizing pg_upgrade's once-in-each-database 
tasks
+ *
+ * This framework provides an efficient way of running the various
+ * once-in-each-database tasks required by pg_upgrade.  Specifically, it
+ * parallelizes these tasks by managing a simple state machine of
+ * user_opts.jobs slots and using libpq's asynchronous APIs to establish the
+ * connections and run the queries.  Callers simply need to create a callback
+ * function and build/execute an UpgradeTask.  A simple example follows:
+ *
+ *             static void
+ *             my_process_cb(DbInfo *dbinfo, PGresult *res, void *arg)
+ *             {
+ *                     for (int i = 0; i < PQntuples(res); i++)
+ *                     {
+ *                             ... process results ...
+ *                     }
+ *             }
+ *
+ *             void
+ *             my_task(ClusterInfo *cluster)
+ *             {
+ *                     UpgradeTask *task = upgrade_task_create();
+ *
+ *                     upgrade_task_add_step(task,
+ *                                                               "... query 
text ...",
+ *                                                               my_process_cb,
+ *                                                               true,         
// let the task free the PGresult
+ *                                                               NULL);        
// "arg" pointer for callback
+ *                     upgrade_task_run(task, cluster);
+ *                     upgrade_task_free(task);
+ *             }
+ *
+ * Note that multiple steps can be added to a given task.  When there are
+ * multiple steps, the task will run all of the steps consecutively in the same
+ * database connection before freeing the connection and moving on.  In other
+ * words, it only ever initiates one connection to each database in the
+ * cluster for a given run.
+ *
+ * Copyright (c) 2024, PostgreSQL Global Development Group
+ * src/bin/pg_upgrade/task.c
+ */
+
+#include "postgres_fe.h"
+
+#include "common/connect.h"
+#include "fe_utils/string_utils.h"
+#include "pg_upgrade.h"
+
+/*
+ * dbs_complete stores the number of databases that we have completed
+ * processing.  When this value equals the number of databases in the cluster,
+ * the task is finished.
+ */
+static int     dbs_complete;
+
+/*
+ * dbs_processing stores the index of the next database in the cluster's array
+ * of databases that will be picked up for processing.  It will always be
+ * greater than or equal to dbs_complete.
+ */
+static int     dbs_processing;
+
+/*
+ * This struct stores all the information for a single step of a task.  All
+ * steps in a task are run in a single connection before moving on to the next
+ * database (which requires a new connection).
+ */
+typedef struct UpgradeTaskStep
+{
+       UpgradeTaskProcessCB process_cb;        /* processes the results of the 
query */
+       const char *query;                      /* query text */
+       bool            free_result;    /* should we free the result? */
+       void       *arg;                        /* pointer passed to process_cb 
*/
+} UpgradeTaskStep;
+
+/*
+ * This struct is a thin wrapper around an array of steps, i.e.,
+ * UpgradeTaskStep.
+ */
+typedef struct UpgradeTask
+{
+       UpgradeTaskStep *steps;
+       int                     num_steps;
+} UpgradeTask;
+
+/*
+ * The different states for a parallel slot.
+ */
+typedef enum
+{
+       FREE,                                           /* slot available for 
use in a new database */
+       CONNECTING,                                     /* waiting for 
connection to be established */
+       RUNNING_QUERIES,                        /* running/processing queries 
in the task */
+} UpgradeTaskSlotState;
+
+/*
+ * We maintain an array of user_opts.jobs slots to execute the task.
+ */
+typedef struct
+{
+       UpgradeTaskSlotState state; /* state of the slot */
+       int                     db_idx;                 /* index of the 
database assigned to slot */
+       int                     step_idx;               /* index of the current 
step of task */
+       PGconn     *conn;                       /* current connection managed 
by slot */
+} UpgradeTaskSlot;
+
+/*
+ * Initializes an UpgradeTask.
+ */
+UpgradeTask *
+upgrade_task_create(void)
+{
+       UpgradeTask *task = pg_malloc0(sizeof(UpgradeTask));
+
+       /* All tasks must first set a secure search_path. */
+       upgrade_task_add_step(task, ALWAYS_SECURE_SEARCH_PATH_SQL, NULL, true, 
NULL);
+       return task;
+}
+
+/*
+ * Frees all storage associated with an UpgradeTask.
+ */
+void
+upgrade_task_free(UpgradeTask *task)
+{
+       if (task->steps)
+               pg_free(task->steps);
+
+       pg_free(task);
+}
+
+/*
+ * Adds a step to an UpgradeTask.  The steps will be executed in each database
+ * in the order in which they are added.
+ *
+ *     task: task object that must have been initialized via 
upgrade_task_create()
+ *     query: the query text
+ *     process_cb: function that processes the results of the query
+ *     free_result: should we free the PGresult, or leave it to the caller?
+ *     arg: pointer to task-specific data that is passed to each callback
+ */
+void
+upgrade_task_add_step(UpgradeTask *task, const char *query,
+                                         UpgradeTaskProcessCB process_cb, bool 
free_result,
+                                         void *arg)
+{
+       UpgradeTaskStep *new_step;
+
+       task->steps = pg_realloc(task->steps,
+                                                        ++task->num_steps * 
sizeof(UpgradeTaskStep));
+
+       new_step = &task->steps[task->num_steps - 1];
+       new_step->process_cb = process_cb;
+       new_step->query = query;
+       new_step->free_result = free_result;
+       new_step->arg = arg;
+}
+
+/*
+ * Build a connection string for the slot's current database and asynchronously
+ * start a new connection, but do not wait for the connection to be
+ * established.
+ */
+static void
+start_conn(const ClusterInfo *cluster, UpgradeTaskSlot *slot)
+{
+       PQExpBufferData conn_opts;
+       DbInfo     *dbinfo = &cluster->dbarr.dbs[slot->db_idx];
+
+       /* Build connection string with proper quoting */
+       initPQExpBuffer(&conn_opts);
+       appendPQExpBufferStr(&conn_opts, "dbname=");
+       appendConnStrVal(&conn_opts, dbinfo->db_name);
+       appendPQExpBufferStr(&conn_opts, " user=");
+       appendConnStrVal(&conn_opts, os_info.user);
+       appendPQExpBuffer(&conn_opts, " port=%d", cluster->port);
+       if (cluster->sockdir)
+       {
+               appendPQExpBufferStr(&conn_opts, " host=");
+               appendConnStrVal(&conn_opts, cluster->sockdir);
+       }
+
+       slot->conn = PQconnectStart(conn_opts.data);
+
+       if (!slot->conn)
+               pg_fatal("failed to create connection with connection string: 
\"%s\"",
+                                conn_opts.data);
+
+       termPQExpBuffer(&conn_opts);
+}
+
+/*
+ * Run the process_cb callback function to process the result of a query, and
+ * free the result if the caller indicated we should do so.
+ */
+static void
+process_query_result(const ClusterInfo *cluster, UpgradeTaskSlot *slot,
+                                        const UpgradeTask *task)
+{
+       UpgradeTaskStep *steps = &task->steps[slot->step_idx];
+       UpgradeTaskProcessCB process_cb = steps->process_cb;
+       DbInfo     *dbinfo = &cluster->dbarr.dbs[slot->db_idx];
+       PGresult   *res = PQgetResult(slot->conn);
+
+       if (PQstatus(slot->conn) == CONNECTION_BAD ||
+               (PQresultStatus(res) != PGRES_TUPLES_OK &&
+                PQresultStatus(res) != PGRES_COMMAND_OK))
+               pg_fatal("connection failure: %s", PQerrorMessage(slot->conn));
+
+       /*
+        * We assume that a NULL process_cb callback function means there's
+        * nothing to process.  This is primarily intended for the inital step 
in
+        * every task that sets a safe search_path.
+        */
+       if (process_cb)
+               (*process_cb) (dbinfo, res, steps->arg);
+
+       if (steps->free_result)
+               PQclear(res);
+}
+
+/*
+ * Advances the state machine for a given slot as necessary.
+ */
+static void
+process_slot(const ClusterInfo *cluster, UpgradeTaskSlot *slot, const 
UpgradeTask *task)
+{
+       PQExpBufferData queries;
+
+       switch (slot->state)
+       {
+               case FREE:
+
+                       /*
+                        * If all of the databases in the cluster have been 
processed or
+                        * are currently being processed by other slots, we are 
done.
+                        */
+                       if (dbs_processing >= cluster->dbarr.ndbs)
+                               return;
+
+                       /*
+                        * Claim the next database in the cluster's array and 
initiate a
+                        * new connection.
+                        */
+                       slot->db_idx = dbs_processing++;
+                       slot->state = CONNECTING;
+                       start_conn(cluster, slot);
+
+                       return;
+
+               case CONNECTING:
+
+                       /* Check for connection failure. */
+                       if (PQconnectPoll(slot->conn) == PGRES_POLLING_FAILED)
+                               pg_fatal("connection failure: %s", 
PQerrorMessage(slot->conn));
+
+                       /* Check whether the connection is still establishing. 
*/
+                       if (PQconnectPoll(slot->conn) != PGRES_POLLING_OK)
+                               return;
+
+                       /*
+                        * Move on to running/processing the queries in the 
task.  We
+                        * combine all the queries and send them to the server 
together.
+                        */
+                       slot->state = RUNNING_QUERIES;
+                       initPQExpBuffer(&queries);
+                       for (int i = 0; i < task->num_steps; i++)
+                               appendPQExpBuffer(&queries, "%s;", 
task->steps[i].query);
+                       if (!PQsendQuery(slot->conn, queries.data))
+                               pg_fatal("connection failure: %s", 
PQerrorMessage(slot->conn));
+                       termPQExpBuffer(&queries);
+
+                       return;
+
+               case RUNNING_QUERIES:
+
+                       /*
+                        * Consume any available data and clear the read-ready 
indicator
+                        * for the connection.
+                        */
+                       if (!PQconsumeInput(slot->conn))
+                               pg_fatal("connection failure: %s", 
PQerrorMessage(slot->conn));
+
+                       /*
+                        * Process any results that are ready so that we can 
free up this
+                        * slot for another database as soon as possible.
+                        */
+                       for (; slot->step_idx < task->num_steps; 
slot->step_idx++)
+                       {
+                               /* If no more results are available yet, move 
on. */
+                               if (PQisBusy(slot->conn))
+                                       return;
+
+                               process_query_result(cluster, slot, task);
+                       }
+
+                       /*
+                        * If we just finished processing the result of the 
last step in
+                        * the task, free the slot.  We recursively call this 
function on
+                        * the newly-freed slot so that we can start initiating 
the next
+                        * connection immediately instead of waiting for the 
next loop
+                        * through the slots.
+                        */
+                       dbs_complete++;
+                       (void) PQgetResult(slot->conn);
+                       PQfinish(slot->conn);
+                       memset(slot, 0, sizeof(UpgradeTaskSlot));
+
+                       process_slot(cluster, slot, task);
+
+                       return;
+       }
+}
+
+/*
+ * Wait on the slots to either finish connecting or to receive query results if
+ * possible.  This avoids a tight loop in upgrade_task_run().
+ */
+static void
+wait_on_slots(UpgradeTaskSlot *slots, int numslots)
+{
+       fd_set          input_mask;
+       fd_set          output_mask;
+       fd_set          except_mask;
+       int                     maxFd = 0;
+
+       FD_ZERO(&input_mask);
+       FD_ZERO(&output_mask);
+       FD_ZERO(&except_mask);
+
+       for (int i = 0; i < numslots; i++)
+       {
+               int                     sock;
+               bool            read = false;
+
+               switch (slots[i].state)
+               {
+                       case FREE:
+
+                               /*
+                                * This function should only ever see free 
slots as we are
+                                * finishing processing the last few databases, 
at which point
+                                * we don't have any databases left for them to 
process. We'll
+                                * never use these slots again, so we can 
safely ignore them.
+                                */
+                               continue;
+
+                       case CONNECTING:
+
+                               /*
+                                * If we are waiting for the connection to 
establish, choose
+                                * whether to wait for reading or for writing 
on the socket as
+                                * appropriate.  If neither apply, just return 
immediately so
+                                * that we can handle the slot.
+                                */
+                               {
+                                       PostgresPollingStatusType status;
+
+                                       status = PQconnectPoll(slots[i].conn);
+                                       if (status == PGRES_POLLING_READING)
+                                               read = true;
+                                       else if (status != 
PGRES_POLLING_WRITING)
+                                               return;
+                               }
+                               break;
+
+                       case RUNNING_QUERIES:
+
+                               /*
+                                * Once we've sent the queries, we must wait 
for the socket to
+                                * be read-ready.  Note that process_slot() 
handles calling
+                                * PQconsumeInput() as required.
+                                */
+                               read = true;
+                               break;
+               }
+
+               /*
+                * If there's some problem retrieving the socket, just pretend 
this
+                * slot doesn't exist.  We don't expect this to happen 
regularly in
+                * practice, so it seems unlikely to cause too much harm.
+                */
+               sock = PQsocket(slots[i].conn);
+               if (sock < 0)
+                       continue;
+
+               /*
+                * Add the socket to the set.
+                */
+               FD_SET(sock, read ? &input_mask : &output_mask);
+               FD_SET(sock, &except_mask);
+               maxFd = Max(maxFd, sock);
+       }
+
+       /*
+        * If we found socket(s) to wait on, wait.
+        */
+       if (maxFd != 0)
+               (void) select(maxFd + 1, &input_mask, &output_mask, 
&except_mask, NULL);
+}
+
+/*
+ * Runs all the steps of the task in every database in the cluster using
+ * user_opts.jobs parallel slots.
+ */
+void
+upgrade_task_run(const UpgradeTask *task, const ClusterInfo *cluster)
+{
+       int                     jobs = Max(1, user_opts.jobs);
+       UpgradeTaskSlot *slots = pg_malloc0(sizeof(UpgradeTaskSlot) * jobs);
+
+       dbs_complete = 0;
+       dbs_processing = 0;
+
+       while (dbs_complete < cluster->dbarr.ndbs)
+       {
+               for (int i = 0; i < jobs; i++)
+                       process_slot(cluster, &slots[i], task);
+
+               wait_on_slots(slots, jobs);
+       }
+
+       pg_free(slots);
+}
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 9e951a9e6f..43c0f9f85b 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -3039,6 +3039,11 @@ UnresolvedTup
 UnresolvedTupData
 UpdateContext
 UpdateStmt
+UpgradeTask
+UpgradeTaskReport
+UpgradeTaskSlot
+UpgradeTaskSlotState
+UpgradeTaskStep
 UploadManifestCmd
 UpperRelationKind
 UpperUniquePath
-- 
2.39.3 (Apple Git-146)

>From a1f2ff2ea0a6f9e0eb9af0ec1e400dbc048079cb Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Fri, 28 Jun 2024 17:21:19 -0500
Subject: [PATCH v10 02/11] Use pg_upgrade's new parallel framework for
 subscription checks.

Reviewed-by: FIXME
Discussion: https://postgr.es/m/20240516211638.GA1688936%40nathanxps13
---
 src/bin/pg_upgrade/check.c | 206 ++++++++++++++++++++-----------------
 1 file changed, 111 insertions(+), 95 deletions(-)

diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index 96adea41e9..f8160e0140 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -1905,6 +1905,38 @@ check_old_cluster_for_valid_slots(void)
        check_ok();
 }
 
+/*
+ * Callback function for processing results of query for
+ * check_old_cluster_subscription_state()'s UpgradeTask.  If the query returned
+ * any rows (i.e., the check failed), write the details to the report file.
+ */
+static void
+process_old_sub_state_check(DbInfo *dbinfo, PGresult *res, void *arg)
+{
+       UpgradeTaskReport *report = (UpgradeTaskReport *) arg;
+       int                     ntup = PQntuples(res);
+       int                     i_srsubstate = PQfnumber(res, "srsubstate");
+       int                     i_subname = PQfnumber(res, "subname");
+       int                     i_nspname = PQfnumber(res, "nspname");
+       int                     i_relname = PQfnumber(res, "relname");
+
+       AssertVariableIsOfType(&process_old_sub_state_check, 
UpgradeTaskProcessCB);
+
+       for (int i = 0; i < ntup; i++)
+       {
+               if (report->file == NULL &&
+                       (report->file = fopen_priv(report->path, "w")) == NULL)
+                       pg_fatal("could not open file \"%s\": %m", 
report->path);
+
+               fprintf(report->file, "The table sync state \"%s\" is not 
allowed for database:\"%s\" subscription:\"%s\" schema:\"%s\" 
relation:\"%s\"\n",
+                               PQgetvalue(res, i, i_srsubstate),
+                               dbinfo->db_name,
+                               PQgetvalue(res, i, i_subname),
+                               PQgetvalue(res, i, i_nspname),
+                               PQgetvalue(res, i, i_relname));
+       }
+}
+
 /*
  * check_old_cluster_subscription_state()
  *
@@ -1915,115 +1947,99 @@ check_old_cluster_for_valid_slots(void)
 static void
 check_old_cluster_subscription_state(void)
 {
-       FILE       *script = NULL;
-       char            output_path[MAXPGPATH];
+       UpgradeTask *task = upgrade_task_create();
+       UpgradeTaskReport report;
+       const char *query;
+       PGresult   *res;
+       PGconn     *conn;
        int                     ntup;
 
        prep_status("Checking for subscription state");
 
-       snprintf(output_path, sizeof(output_path), "%s/%s",
+       report.file = NULL;
+       snprintf(report.path, sizeof(report.path), "%s/%s",
                         log_opts.basedir,
                         "subs_invalid.txt");
-       for (int dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
-       {
-               PGresult   *res;
-               DbInfo     *active_db = &old_cluster.dbarr.dbs[dbnum];
-               PGconn     *conn = connectToServer(&old_cluster, 
active_db->db_name);
-
-               /* We need to check for pg_replication_origin only once. */
-               if (dbnum == 0)
-               {
-                       /*
-                        * Check that all the subscriptions have their 
respective
-                        * replication origin.
-                        */
-                       res = executeQueryOrDie(conn,
-                                                                       "SELECT 
d.datname, s.subname "
-                                                                       "FROM 
pg_catalog.pg_subscription s "
-                                                                       "LEFT 
OUTER JOIN pg_catalog.pg_replication_origin o "
-                                                                       "       
ON o.roname = 'pg_' || s.oid "
-                                                                       "INNER 
JOIN pg_catalog.pg_database d "
-                                                                       "       
ON d.oid = s.subdbid "
-                                                                       "WHERE 
o.roname IS NULL;");
-
-                       ntup = PQntuples(res);
-                       for (int i = 0; i < ntup; i++)
-                       {
-                               if (script == NULL && (script = 
fopen_priv(output_path, "w")) == NULL)
-                                       pg_fatal("could not open file \"%s\": 
%m", output_path);
-                               fprintf(script, "The replication origin is 
missing for database:\"%s\" subscription:\"%s\"\n",
-                                               PQgetvalue(res, i, 0),
-                                               PQgetvalue(res, i, 1));
-                       }
-                       PQclear(res);
-               }
-
-               /*
-                * We don't allow upgrade if there is a risk of dangling slot or
-                * origin corresponding to initial sync after upgrade.
-                *
-                * A slot/origin not created yet refers to the 'i' (initialize) 
state,
-                * while 'r' (ready) state refers to a slot/origin created 
previously
-                * but already dropped. These states are supported for 
pg_upgrade. The
-                * other states listed below are not supported:
-                *
-                * a) SUBREL_STATE_DATASYNC: A relation upgraded while in this 
state
-                * would retain a replication slot, which could not be dropped 
by the
-                * sync worker spawned after the upgrade because the 
subscription ID
-                * used for the slot name won't match anymore.
-                *
-                * b) SUBREL_STATE_SYNCDONE: A relation upgraded while in this 
state
-                * would retain the replication origin when there is a failure 
in
-                * tablesync worker immediately after dropping the replication 
slot in
-                * the publisher.
-                *
-                * c) SUBREL_STATE_FINISHEDCOPY: A tablesync worker spawned to 
work on
-                * a relation upgraded while in this state would expect an 
origin ID
-                * with the OID of the subscription used before the upgrade, 
causing
-                * it to fail.
-                *
-                * d) SUBREL_STATE_SYNCWAIT, SUBREL_STATE_CATCHUP and
-                * SUBREL_STATE_UNKNOWN: These states are not stored in the 
catalog,
-                * so we need not allow these states.
-                */
-               res = executeQueryOrDie(conn,
-                                                               "SELECT 
r.srsubstate, s.subname, n.nspname, c.relname "
-                                                               "FROM 
pg_catalog.pg_subscription_rel r "
-                                                               "LEFT JOIN 
pg_catalog.pg_subscription s"
-                                                               "       ON 
r.srsubid = s.oid "
-                                                               "LEFT JOIN 
pg_catalog.pg_class c"
-                                                               "       ON 
r.srrelid = c.oid "
-                                                               "LEFT JOIN 
pg_catalog.pg_namespace n"
-                                                               "       ON 
c.relnamespace = n.oid "
-                                                               "WHERE 
r.srsubstate NOT IN ('i', 'r') "
-                                                               "ORDER BY 
s.subname");
-
-               ntup = PQntuples(res);
-               for (int i = 0; i < ntup; i++)
-               {
-                       if (script == NULL && (script = fopen_priv(output_path, 
"w")) == NULL)
-                               pg_fatal("could not open file \"%s\": %m", 
output_path);
-
-                       fprintf(script, "The table sync state \"%s\" is not 
allowed for database:\"%s\" subscription:\"%s\" schema:\"%s\" 
relation:\"%s\"\n",
-                                       PQgetvalue(res, i, 0),
-                                       active_db->db_name,
-                                       PQgetvalue(res, i, 1),
-                                       PQgetvalue(res, i, 2),
-                                       PQgetvalue(res, i, 3));
-               }
 
-               PQclear(res);
-               PQfinish(conn);
+       /*
+        * Check that all the subscriptions have their respective replication
+        * origin.  This check only needs to run once.
+        */
+       conn = connectToServer(&old_cluster, old_cluster.dbarr.dbs[0].db_name);
+       res = executeQueryOrDie(conn,
+                                                       "SELECT d.datname, 
s.subname "
+                                                       "FROM 
pg_catalog.pg_subscription s "
+                                                       "LEFT OUTER JOIN 
pg_catalog.pg_replication_origin o "
+                                                       "       ON o.roname = 
'pg_' || s.oid "
+                                                       "INNER JOIN 
pg_catalog.pg_database d "
+                                                       "       ON d.oid = 
s.subdbid "
+                                                       "WHERE o.roname IS 
NULL;");
+       ntup = PQntuples(res);
+       for (int i = 0; i < ntup; i++)
+       {
+               if (report.file == NULL &&
+                       (report.file = fopen_priv(report.path, "w")) == NULL)
+                       pg_fatal("could not open file \"%s\": %m", report.path);
+               fprintf(report.file, "The replication origin is missing for 
database:\"%s\" subscription:\"%s\"\n",
+                               PQgetvalue(res, i, 0),
+                               PQgetvalue(res, i, 1));
        }
+       PQclear(res);
+       PQfinish(conn);
 
-       if (script)
+       /*
+        * We don't allow upgrade if there is a risk of dangling slot or origin
+        * corresponding to initial sync after upgrade.
+        *
+        * A slot/origin not created yet refers to the 'i' (initialize) state,
+        * while 'r' (ready) state refers to a slot/origin created previously 
but
+        * already dropped. These states are supported for pg_upgrade. The other
+        * states listed below are not supported:
+        *
+        * a) SUBREL_STATE_DATASYNC: A relation upgraded while in this state 
would
+        * retain a replication slot, which could not be dropped by the sync
+        * worker spawned after the upgrade because the subscription ID used for
+        * the slot name won't match anymore.
+        *
+        * b) SUBREL_STATE_SYNCDONE: A relation upgraded while in this state 
would
+        * retain the replication origin when there is a failure in tablesync
+        * worker immediately after dropping the replication slot in the
+        * publisher.
+        *
+        * c) SUBREL_STATE_FINISHEDCOPY: A tablesync worker spawned to work on a
+        * relation upgraded while in this state would expect an origin ID with
+        * the OID of the subscription used before the upgrade, causing it to
+        * fail.
+        *
+        * d) SUBREL_STATE_SYNCWAIT, SUBREL_STATE_CATCHUP and
+        * SUBREL_STATE_UNKNOWN: These states are not stored in the catalog, so 
we
+        * need not allow these states.
+        */
+       query = "SELECT r.srsubstate, s.subname, n.nspname, c.relname "
+               "FROM pg_catalog.pg_subscription_rel r "
+               "LEFT JOIN pg_catalog.pg_subscription s"
+               "   ON r.srsubid = s.oid "
+               "LEFT JOIN pg_catalog.pg_class c"
+               "   ON r.srrelid = c.oid "
+               "LEFT JOIN pg_catalog.pg_namespace n"
+               "   ON c.relnamespace = n.oid "
+               "WHERE r.srsubstate NOT IN ('i', 'r') "
+               "ORDER BY s.subname";
+
+       upgrade_task_add_step(task, query, process_old_sub_state_check,
+                                                 true, &report);
+
+       upgrade_task_run(task, &old_cluster);
+       upgrade_task_free(task);
+
+       if (report.file)
        {
-               fclose(script);
+               fclose(report.file);
                pg_log(PG_REPORT, "fatal");
                pg_fatal("Your installation contains subscriptions without 
origin or having relations not in i (initialize) or r (ready) state.\n"
                                 "You can allow the initial sync to finish for 
all relations and then restart the upgrade.\n"
                                 "A list of the problematic subscriptions is in 
the file:\n"
-                                "    %s", output_path);
+                                "    %s", report.path);
        }
        else
                check_ok();
-- 
2.39.3 (Apple Git-146)

>From 67479ba2693eec0f16208b3e43a2d22cba38a15e Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Fri, 28 Jun 2024 21:09:33 -0500
Subject: [PATCH v10 03/11] Use pg_upgrade's new parallel framework to get
 relation info.

Reviewed-by: FIXME
Discussion: https://postgr.es/m/20240516211638.GA1688936%40nathanxps13
---
 src/bin/pg_upgrade/info.c | 296 ++++++++++++++++++++------------------
 1 file changed, 154 insertions(+), 142 deletions(-)

diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c
index d3c1e8918d..2bfc8dcfba 100644
--- a/src/bin/pg_upgrade/info.c
+++ b/src/bin/pg_upgrade/info.c
@@ -11,6 +11,7 @@
 
 #include "access/transam.h"
 #include "catalog/pg_class_d.h"
+#include "pqexpbuffer.h"
 #include "pg_upgrade.h"
 
 static void create_rel_filename_map(const char *old_data, const char *new_data,
@@ -22,12 +23,14 @@ static void report_unmatched_relation(const RelInfo *rel, 
const DbInfo *db,
 static void free_db_and_rel_infos(DbInfoArr *db_arr);
 static void get_template0_info(ClusterInfo *cluster);
 static void get_db_infos(ClusterInfo *cluster);
-static void get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo);
+static char *get_rel_infos_query(void);
+static void process_rel_infos(DbInfo *dbinfo, PGresult *res, void *arg);
 static void free_rel_infos(RelInfoArr *rel_arr);
 static void print_db_infos(DbInfoArr *db_arr);
 static void print_rel_infos(RelInfoArr *rel_arr);
 static void print_slot_infos(LogicalSlotInfoArr *slot_arr);
-static void get_old_cluster_logical_slot_infos(DbInfo *dbinfo);
+static char *get_old_cluster_logical_slot_infos_query(void);
+static void process_old_cluster_logical_slot_infos(DbInfo *dbinfo, PGresult 
*res, void *arg);
 
 
 /*
@@ -276,7 +279,9 @@ report_unmatched_relation(const RelInfo *rel, const DbInfo 
*db, bool is_new_db)
 void
 get_db_rel_and_slot_infos(ClusterInfo *cluster)
 {
-       int                     dbnum;
+       UpgradeTask *task = upgrade_task_create();
+       char       *rel_infos_query = NULL;
+       char       *logical_slot_infos_query = NULL;
 
        if (cluster->dbarr.dbs != NULL)
                free_db_and_rel_infos(&cluster->dbarr);
@@ -284,15 +289,37 @@ get_db_rel_and_slot_infos(ClusterInfo *cluster)
        get_template0_info(cluster);
        get_db_infos(cluster);
 
-       for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
+       rel_infos_query = get_rel_infos_query();
+       upgrade_task_add_step(task,
+                                                 rel_infos_query,
+                                                 process_rel_infos,
+                                                 true, NULL);
+
+       /*
+        * Logical slots are only carried over to the new cluster when the old
+        * cluster is on PG17 or newer.  This is because before that the logical
+        * slots are not saved at shutdown, so there is no guarantee that the
+        * latest confirmed_flush_lsn is saved to disk which can lead to data
+        * loss. It is still not guaranteed for manually created slots in PG17, 
so
+        * subsequent checks done in check_old_cluster_for_valid_slots() would
+        * raise a FATAL error if such slots are included.
+        */
+       if (cluster == &old_cluster &&
+               GET_MAJOR_VERSION(cluster->major_version) > 1600)
        {
-               DbInfo     *pDbInfo = &cluster->dbarr.dbs[dbnum];
+               logical_slot_infos_query = 
get_old_cluster_logical_slot_infos_query();
+               upgrade_task_add_step(task,
+                                                         
logical_slot_infos_query,
+                                                         
process_old_cluster_logical_slot_infos,
+                                                         true, NULL);
+       }
 
-               get_rel_infos(cluster, pDbInfo);
+       upgrade_task_run(task, cluster);
+       upgrade_task_free(task);
 
-               if (cluster == &old_cluster)
-                       get_old_cluster_logical_slot_infos(pDbInfo);
-       }
+       pg_free(rel_infos_query);
+       if (logical_slot_infos_query)
+               pg_free(logical_slot_infos_query);
 
        if (cluster == &old_cluster)
                pg_log(PG_VERBOSE, "\nsource databases:");
@@ -431,40 +458,21 @@ get_db_infos(ClusterInfo *cluster)
 
 
 /*
- * get_rel_infos()
+ * get_rel_infos_query()
  *
- * gets the relinfos for all the user tables and indexes of the database
- * referred to by "dbinfo".
+ * Returns the query for retrieving the relation information for all the user
+ * tables and indexes in the database, for use by get_db_rel_and_slot_infos()'s
+ * UpgradeTask.
  *
- * Note: the resulting RelInfo array is assumed to be sorted by OID.
- * This allows later processing to match up old and new databases efficiently.
+ * Note: the result is assumed to be sorted by OID.  This allows later
+ * processing to match up old and new databases efficiently.
  */
-static void
-get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
+static char *
+get_rel_infos_query(void)
 {
-       PGconn     *conn = connectToServer(cluster,
-                                                                          
dbinfo->db_name);
-       PGresult   *res;
-       RelInfo    *relinfos;
-       int                     ntups;
-       int                     relnum;
-       int                     num_rels = 0;
-       char       *nspname = NULL;
-       char       *relname = NULL;
-       char       *tablespace = NULL;
-       int                     i_spclocation,
-                               i_nspname,
-                               i_relname,
-                               i_reloid,
-                               i_indtable,
-                               i_toastheap,
-                               i_relfilenumber,
-                               i_reltablespace;
-       char            query[QUERY_ALLOC];
-       char       *last_namespace = NULL,
-                          *last_tablespace = NULL;
+       PQExpBufferData query;
 
-       query[0] = '\0';                        /* initialize query string to 
empty */
+       initPQExpBuffer(&query);
 
        /*
         * Create a CTE that collects OIDs of regular user tables and matviews,
@@ -476,34 +484,34 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
         * output, so we have to copy that system table.  It's easiest to do 
that
         * by treating it as a user table.
         */
-       snprintf(query + strlen(query), sizeof(query) - strlen(query),
-                        "WITH regular_heap (reloid, indtable, toastheap) AS ( "
-                        "  SELECT c.oid, 0::oid, 0::oid "
-                        "  FROM pg_catalog.pg_class c JOIN 
pg_catalog.pg_namespace n "
-                        "         ON c.relnamespace = n.oid "
-                        "  WHERE relkind IN (" CppAsString2(RELKIND_RELATION) 
", "
-                        CppAsString2(RELKIND_MATVIEW) ") AND "
+       appendPQExpBuffer(&query,
+                                         "WITH regular_heap (reloid, indtable, 
toastheap) AS ( "
+                                         "  SELECT c.oid, 0::oid, 0::oid "
+                                         "  FROM pg_catalog.pg_class c JOIN 
pg_catalog.pg_namespace n "
+                                         "         ON c.relnamespace = n.oid "
+                                         "  WHERE relkind IN (" 
CppAsString2(RELKIND_RELATION) ", "
+                                         CppAsString2(RELKIND_MATVIEW) ") AND "
        /* exclude possible orphaned temp tables */
-                        "    ((n.nspname !~ '^pg_temp_' AND "
-                        "      n.nspname !~ '^pg_toast_temp_' AND "
-                        "      n.nspname NOT IN ('pg_catalog', 
'information_schema', "
-                        "                        'binary_upgrade', 'pg_toast') 
AND "
-                        "      c.oid >= %u::pg_catalog.oid) OR "
-                        "     (n.nspname = 'pg_catalog' AND "
-                        "      relname IN ('pg_largeobject') ))), ",
-                        FirstNormalObjectId);
+                                         "    ((n.nspname !~ '^pg_temp_' AND "
+                                         "      n.nspname !~ '^pg_toast_temp_' 
AND "
+                                         "      n.nspname NOT IN 
('pg_catalog', 'information_schema', "
+                                         "                        
'binary_upgrade', 'pg_toast') AND "
+                                         "      c.oid >= %u::pg_catalog.oid) 
OR "
+                                         "     (n.nspname = 'pg_catalog' AND "
+                                         "      relname IN ('pg_largeobject') 
))), ",
+                                         FirstNormalObjectId);
 
        /*
         * Add a CTE that collects OIDs of toast tables belonging to the tables
         * selected by the regular_heap CTE.  (We have to do this separately
         * because the namespace-name rules above don't work for toast tables.)
         */
-       snprintf(query + strlen(query), sizeof(query) - strlen(query),
-                        "  toast_heap (reloid, indtable, toastheap) AS ( "
-                        "  SELECT c.reltoastrelid, 0::oid, c.oid "
-                        "  FROM regular_heap JOIN pg_catalog.pg_class c "
-                        "      ON regular_heap.reloid = c.oid "
-                        "  WHERE c.reltoastrelid != 0), ");
+       appendPQExpBufferStr(&query,
+                                                "  toast_heap (reloid, 
indtable, toastheap) AS ( "
+                                                "  SELECT c.reltoastrelid, 
0::oid, c.oid "
+                                                "  FROM regular_heap JOIN 
pg_catalog.pg_class c "
+                                                "      ON regular_heap.reloid 
= c.oid "
+                                                "  WHERE c.reltoastrelid != 
0), ");
 
        /*
         * Add a CTE that collects OIDs of all valid indexes on the previously
@@ -511,53 +519,68 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
         * Testing indisready is necessary in 9.2, and harmless in earlier/later
         * versions.
         */
-       snprintf(query + strlen(query), sizeof(query) - strlen(query),
-                        "  all_index (reloid, indtable, toastheap) AS ( "
-                        "  SELECT indexrelid, indrelid, 0::oid "
-                        "  FROM pg_catalog.pg_index "
-                        "  WHERE indisvalid AND indisready "
-                        "    AND indrelid IN "
-                        "        (SELECT reloid FROM regular_heap "
-                        "         UNION ALL "
-                        "         SELECT reloid FROM toast_heap)) ");
+       appendPQExpBufferStr(&query,
+                                                "  all_index (reloid, 
indtable, toastheap) AS ( "
+                                                "  SELECT indexrelid, 
indrelid, 0::oid "
+                                                "  FROM pg_catalog.pg_index "
+                                                "  WHERE indisvalid AND 
indisready "
+                                                "    AND indrelid IN "
+                                                "        (SELECT reloid FROM 
regular_heap "
+                                                "         UNION ALL "
+                                                "         SELECT reloid FROM 
toast_heap)) ");
 
        /*
         * And now we can write the query that retrieves the data we want for 
each
         * heap and index relation.  Make sure result is sorted by OID.
         */
-       snprintf(query + strlen(query), sizeof(query) - strlen(query),
-                        "SELECT all_rels.*, n.nspname, c.relname, "
-                        "  c.relfilenode, c.reltablespace, "
-                        "  pg_catalog.pg_tablespace_location(t.oid) AS 
spclocation "
-                        "FROM (SELECT * FROM regular_heap "
-                        "      UNION ALL "
-                        "      SELECT * FROM toast_heap "
-                        "      UNION ALL "
-                        "      SELECT * FROM all_index) all_rels "
-                        "  JOIN pg_catalog.pg_class c "
-                        "      ON all_rels.reloid = c.oid "
-                        "  JOIN pg_catalog.pg_namespace n "
-                        "     ON c.relnamespace = n.oid "
-                        "  LEFT OUTER JOIN pg_catalog.pg_tablespace t "
-                        "     ON c.reltablespace = t.oid "
-                        "ORDER BY 1;");
-
-       res = executeQueryOrDie(conn, "%s", query);
-
-       ntups = PQntuples(res);
+       appendPQExpBufferStr(&query,
+                                                "SELECT all_rels.*, n.nspname, 
c.relname, "
+                                                "  c.relfilenode, 
c.reltablespace, "
+                                                "  
pg_catalog.pg_tablespace_location(t.oid) AS spclocation "
+                                                "FROM (SELECT * FROM 
regular_heap "
+                                                "      UNION ALL "
+                                                "      SELECT * FROM 
toast_heap "
+                                                "      UNION ALL "
+                                                "      SELECT * FROM 
all_index) all_rels "
+                                                "  JOIN pg_catalog.pg_class c "
+                                                "      ON all_rels.reloid = 
c.oid "
+                                                "  JOIN 
pg_catalog.pg_namespace n "
+                                                "     ON c.relnamespace = 
n.oid "
+                                                "  LEFT OUTER JOIN 
pg_catalog.pg_tablespace t "
+                                                "     ON c.reltablespace = 
t.oid "
+                                                "ORDER BY 1;");
+
+       return query.data;
+}
 
-       relinfos = (RelInfo *) pg_malloc(sizeof(RelInfo) * ntups);
+/*
+ * Callback function for processing results of the query returned by
+ * get_rel_infos_query(), which is used for get_db_rel_and_slot_infos()'s
+ * UpgradeTask.  This function stores the relation information for later use.
+ */
+static void
+process_rel_infos(DbInfo *dbinfo, PGresult *res, void *arg)
+{
+       int                     ntups = PQntuples(res);
+       RelInfo    *relinfos = (RelInfo *) pg_malloc(sizeof(RelInfo) * ntups);
+       int                     i_reloid = PQfnumber(res, "reloid");
+       int                     i_indtable = PQfnumber(res, "indtable");
+       int                     i_toastheap = PQfnumber(res, "toastheap");
+       int                     i_nspname = PQfnumber(res, "nspname");
+       int                     i_relname = PQfnumber(res, "relname");
+       int                     i_relfilenumber = PQfnumber(res, "relfilenode");
+       int                     i_reltablespace = PQfnumber(res, 
"reltablespace");
+       int                     i_spclocation = PQfnumber(res, "spclocation");
+       int                     num_rels = 0;
+       char       *nspname = NULL;
+       char       *relname = NULL;
+       char       *tablespace = NULL;
+       char       *last_namespace = NULL;
+       char       *last_tablespace = NULL;
 
-       i_reloid = PQfnumber(res, "reloid");
-       i_indtable = PQfnumber(res, "indtable");
-       i_toastheap = PQfnumber(res, "toastheap");
-       i_nspname = PQfnumber(res, "nspname");
-       i_relname = PQfnumber(res, "relname");
-       i_relfilenumber = PQfnumber(res, "relfilenode");
-       i_reltablespace = PQfnumber(res, "reltablespace");
-       i_spclocation = PQfnumber(res, "spclocation");
+       AssertVariableIsOfType(&process_rel_infos, UpgradeTaskProcessCB);
 
-       for (relnum = 0; relnum < ntups; relnum++)
+       for (int relnum = 0; relnum < ntups; relnum++)
        {
                RelInfo    *curr = &relinfos[num_rels++];
 
@@ -610,44 +633,22 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
                        /* A zero reltablespace oid indicates the database 
tablespace. */
                        curr->tablespace = dbinfo->db_tablespace;
        }
-       PQclear(res);
-
-       PQfinish(conn);
 
        dbinfo->rel_arr.rels = relinfos;
        dbinfo->rel_arr.nrels = num_rels;
 }
 
 /*
- * get_old_cluster_logical_slot_infos()
- *
- * Gets the LogicalSlotInfos for all the logical replication slots of the
- * database referred to by "dbinfo". The status of each logical slot is gotten
- * here, but they are used at the checking phase. See
- * check_old_cluster_for_valid_slots().
+ * get_old_cluster_logical_slot_infos_query()
  *
- * Note: This function will not do anything if the old cluster is pre-PG17.
- * This is because before that the logical slots are not saved at shutdown, so
- * there is no guarantee that the latest confirmed_flush_lsn is saved to disk
- * which can lead to data loss. It is still not guaranteed for manually created
- * slots in PG17, so subsequent checks done in
- * check_old_cluster_for_valid_slots() would raise a FATAL error if such slots
- * are included.
+ * Returns the query for retrieving the logical slot information for all the
+ * logical replication slots in the database, for use by
+ * get_db_rel_and_slot_infos()'s UpgradeTask.  The status of each logical slot
+ * is checked in check_old_cluster_for_valid_slots().
  */
-static void
-get_old_cluster_logical_slot_infos(DbInfo *dbinfo)
+static char *
+get_old_cluster_logical_slot_infos_query(void)
 {
-       PGconn     *conn;
-       PGresult   *res;
-       LogicalSlotInfo *slotinfos = NULL;
-       int                     num_slots;
-
-       /* Logical slots can be migrated since PG17. */
-       if (GET_MAJOR_VERSION(old_cluster.major_version) <= 1600)
-               return;
-
-       conn = connectToServer(&old_cluster, dbinfo->db_name);
-
        /*
         * Fetch the logical replication slot information. The check whether the
         * slot is considered caught up is done by an upgrade function. This
@@ -665,18 +666,32 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo)
         * started and stopped several times causing any temporary slots to be
         * removed.
         */
-       res = executeQueryOrDie(conn, "SELECT slot_name, plugin, two_phase, 
failover, "
-                                                       "%s as caught_up, 
invalidation_reason IS NOT NULL as invalid "
-                                                       "FROM 
pg_catalog.pg_replication_slots "
-                                                       "WHERE slot_type = 
'logical' AND "
-                                                       "database = 
current_database() AND "
-                                                       "temporary IS FALSE;",
-                                                       user_opts.live_check ? 
"FALSE" :
-                                                       "(CASE WHEN 
invalidation_reason IS NOT NULL THEN FALSE "
-                                                       "ELSE (SELECT 
pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name)) "
-                                                       "END)");
-
-       num_slots = PQntuples(res);
+       return psprintf("SELECT slot_name, plugin, two_phase, failover, "
+                                       "%s as caught_up, invalidation_reason 
IS NOT NULL as invalid "
+                                       "FROM pg_catalog.pg_replication_slots "
+                                       "WHERE slot_type = 'logical' AND "
+                                       "database = current_database() AND "
+                                       "temporary IS FALSE;",
+                                       user_opts.live_check ? "FALSE" :
+                                       "(CASE WHEN invalidation_reason IS NOT 
NULL THEN FALSE "
+                                       "ELSE (SELECT 
pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name)) "
+                                       "END)");
+}
+
+/*
+ * Callback function for processing results of the query returned by
+ * get_old_cluster_logical_slot_infos_query(), which is used for
+ * get_db_rel_and_slot_infos()'s UpgradeTask.  This function stores the logical
+ * slot information for later use.
+ */
+static void
+process_old_cluster_logical_slot_infos(DbInfo *dbinfo, PGresult *res, void 
*arg)
+{
+       LogicalSlotInfo *slotinfos = NULL;
+       int                     num_slots = PQntuples(res);
+
+       AssertVariableIsOfType(&process_old_cluster_logical_slot_infos,
+                                                  UpgradeTaskProcessCB);
 
        if (num_slots)
        {
@@ -709,9 +724,6 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo)
                }
        }
 
-       PQclear(res);
-       PQfinish(conn);
-
        dbinfo->slot_arr.slots = slotinfos;
        dbinfo->slot_arr.nslots = num_slots;
 }
-- 
2.39.3 (Apple Git-146)

>From 0abadb72bf3675b0d48721b69bdaa5a734827112 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Wed, 28 Aug 2024 13:32:33 -0500
Subject: [PATCH v10 04/11] Use pg_upgrade's new parallel framework to get
 loadable libraries.

Reviewed-by: FIXME
Discussion: https://postgr.es/m/20240516211638.GA1688936%40nathanxps13
---
 src/bin/pg_upgrade/function.c | 71 ++++++++++++++++++++++-------------
 1 file changed, 45 insertions(+), 26 deletions(-)

diff --git a/src/bin/pg_upgrade/function.c b/src/bin/pg_upgrade/function.c
index 7e3abed098..0588347b49 100644
--- a/src/bin/pg_upgrade/function.c
+++ b/src/bin/pg_upgrade/function.c
@@ -42,6 +42,30 @@ library_name_compare(const void *p1, const void *p2)
                                          ((const LibraryInfo *) p2)->dbnum);
 }
 
+/*
+ * Private state for get_loadable_libraries()'s UpgradeTask.
+ */
+struct loadable_libraries_state
+{
+       PGresult  **ress;                       /* results for each database */
+       int                     totaltups;              /* number of tuples in 
all results */
+};
+
+/*
+ * Callback function for processing results of query for
+ * get_loadable_libraries()'s UpgradeTask.  This function stores the results
+ * for later use within get_loadable_libraries().
+ */
+static void
+process_loadable_libraries(DbInfo *dbinfo, PGresult *res, void *arg)
+{
+       struct loadable_libraries_state *state = (struct 
loadable_libraries_state *) arg;
+
+       AssertVariableIsOfType(&process_loadable_libraries, 
UpgradeTaskProcessCB);
+
+       state->ress[dbinfo - old_cluster.dbarr.dbs] = res;
+       state->totaltups += PQntuples(res);
+}
 
 /*
  * get_loadable_libraries()
@@ -54,47 +78,41 @@ library_name_compare(const void *p1, const void *p2)
 void
 get_loadable_libraries(void)
 {
-       PGresult  **ress;
        int                     totaltups;
        int                     dbnum;
        int                     n_libinfos;
+       UpgradeTask *task = upgrade_task_create();
+       struct loadable_libraries_state state;
+       char       *query;
 
-       ress = (PGresult **) pg_malloc(old_cluster.dbarr.ndbs * sizeof(PGresult 
*));
-       totaltups = 0;
+       state.ress = (PGresult **) pg_malloc(old_cluster.dbarr.ndbs * 
sizeof(PGresult *));
+       state.totaltups = 0;
 
-       /* Fetch all library names, removing duplicates within each DB */
-       for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
-       {
-               DbInfo     *active_db = &old_cluster.dbarr.dbs[dbnum];
-               PGconn     *conn = connectToServer(&old_cluster, 
active_db->db_name);
+       query = psprintf("SELECT DISTINCT probin "
+                                        "FROM pg_catalog.pg_proc "
+                                        "WHERE prolang = %u AND "
+                                        "probin IS NOT NULL AND "
+                                        "oid >= %u;",
+                                        ClanguageId,
+                                        FirstNormalObjectId);
 
-               /*
-                * Fetch all libraries containing non-built-in C functions in 
this DB.
-                */
-               ress[dbnum] = executeQueryOrDie(conn,
-                                                                               
"SELECT DISTINCT probin "
-                                                                               
"FROM pg_catalog.pg_proc "
-                                                                               
"WHERE prolang = %u AND "
-                                                                               
"probin IS NOT NULL AND "
-                                                                               
"oid >= %u;",
-                                                                               
ClanguageId,
-                                                                               
FirstNormalObjectId);
-               totaltups += PQntuples(ress[dbnum]);
-
-               PQfinish(conn);
-       }
+       upgrade_task_add_step(task, query, process_loadable_libraries,
+                                                 false, &state);
+
+       upgrade_task_run(task, &old_cluster);
+       upgrade_task_free(task);
 
        /*
         * Allocate memory for required libraries and logical replication output
         * plugins.
         */
-       n_libinfos = totaltups + count_old_cluster_logical_slots();
+       n_libinfos = state.totaltups + count_old_cluster_logical_slots();
        os_info.libraries = (LibraryInfo *) pg_malloc(sizeof(LibraryInfo) * 
n_libinfos);
        totaltups = 0;
 
        for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
        {
-               PGresult   *res = ress[dbnum];
+               PGresult   *res = state.ress[dbnum];
                int                     ntups;
                int                     rowno;
                LogicalSlotInfoArr *slot_arr = 
&old_cluster.dbarr.dbs[dbnum].slot_arr;
@@ -129,7 +147,8 @@ get_loadable_libraries(void)
                }
        }
 
-       pg_free(ress);
+       pg_free(state.ress);
+       pg_free(query);
 
        os_info.num_libraries = totaltups;
 }
-- 
2.39.3 (Apple Git-146)

>From d12f81e0320b388434b24d1d679571764e7fbd06 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Wed, 28 Aug 2024 14:18:39 -0500
Subject: [PATCH v10 05/11] Use pg_upgrade's new parallel framework for
 extension updates.

Reviewed-by: FIXME
Discussion: https://postgr.es/m/20240516211638.GA1688936%40nathanxps13
---
 src/bin/pg_upgrade/version.c | 94 +++++++++++++++++++-----------------
 1 file changed, 49 insertions(+), 45 deletions(-)

diff --git a/src/bin/pg_upgrade/version.c b/src/bin/pg_upgrade/version.c
index 2de6dffccd..5084b08805 100644
--- a/src/bin/pg_upgrade/version.c
+++ b/src/bin/pg_upgrade/version.c
@@ -139,6 +139,41 @@ old_9_6_invalidate_hash_indexes(ClusterInfo *cluster, bool 
check_mode)
                check_ok();
 }
 
+/*
+ * Callback function for processing results of query for
+ * report_extension_updates()'s UpgradeTask.  If the query returned any rows,
+ * write the details to the report file.
+ */
+static void
+process_extension_updates(DbInfo *dbinfo, PGresult *res, void *arg)
+{
+       bool            db_used = false;
+       int                     ntups = PQntuples(res);
+       int                     i_name = PQfnumber(res, "name");
+       UpgradeTaskReport *report = (UpgradeTaskReport *) arg;
+
+       AssertVariableIsOfType(&process_extension_updates, 
UpgradeTaskProcessCB);
+
+       for (int rowno = 0; rowno < ntups; rowno++)
+       {
+               if (report->file == NULL &&
+                       (report->file = fopen_priv(report->path, "w")) == NULL)
+                       pg_fatal("could not open file \"%s\": %m", 
report->path);
+               if (!db_used)
+               {
+                       PQExpBufferData connectbuf;
+
+                       initPQExpBuffer(&connectbuf);
+                       appendPsqlMetaConnect(&connectbuf, dbinfo->db_name);
+                       fputs(connectbuf.data, report->file);
+                       termPQExpBuffer(&connectbuf);
+                       db_used = true;
+               }
+               fprintf(report->file, "ALTER EXTENSION %s UPDATE;\n",
+                               quote_identifier(PQgetvalue(res, rowno, 
i_name)));
+       }
+}
+
 /*
  * report_extension_updates()
  *     Report extensions that should be updated.
@@ -146,57 +181,26 @@ old_9_6_invalidate_hash_indexes(ClusterInfo *cluster, 
bool check_mode)
 void
 report_extension_updates(ClusterInfo *cluster)
 {
-       int                     dbnum;
-       FILE       *script = NULL;
-       char       *output_path = "update_extensions.sql";
+       UpgradeTaskReport report;
+       UpgradeTask *task = upgrade_task_create();
+       const char *query = "SELECT name "
+               "FROM pg_available_extensions "
+               "WHERE installed_version != default_version";
 
        prep_status("Checking for extension updates");
 
-       for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
-       {
-               PGresult   *res;
-               bool            db_used = false;
-               int                     ntups;
-               int                     rowno;
-               int                     i_name;
-               DbInfo     *active_db = &cluster->dbarr.dbs[dbnum];
-               PGconn     *conn = connectToServer(cluster, active_db->db_name);
-
-               /* find extensions needing updates */
-               res = executeQueryOrDie(conn,
-                                                               "SELECT name "
-                                                               "FROM 
pg_available_extensions "
-                                                               "WHERE 
installed_version != default_version"
-                       );
+       report.file = NULL;
+       strcpy(report.path, "update_extensions.sql");
 
-               ntups = PQntuples(res);
-               i_name = PQfnumber(res, "name");
-               for (rowno = 0; rowno < ntups; rowno++)
-               {
-                       if (script == NULL && (script = fopen_priv(output_path, 
"w")) == NULL)
-                               pg_fatal("could not open file \"%s\": %m", 
output_path);
-                       if (!db_used)
-                       {
-                               PQExpBufferData connectbuf;
+       upgrade_task_add_step(task, query, process_extension_updates,
+                                                 true, &report);
 
-                               initPQExpBuffer(&connectbuf);
-                               appendPsqlMetaConnect(&connectbuf, 
active_db->db_name);
-                               fputs(connectbuf.data, script);
-                               termPQExpBuffer(&connectbuf);
-                               db_used = true;
-                       }
-                       fprintf(script, "ALTER EXTENSION %s UPDATE;\n",
-                                       quote_identifier(PQgetvalue(res, rowno, 
i_name)));
-               }
+       upgrade_task_run(task, cluster);
+       upgrade_task_free(task);
 
-               PQclear(res);
-
-               PQfinish(conn);
-       }
-
-       if (script)
+       if (report.file)
        {
-               fclose(script);
+               fclose(report.file);
                report_status(PG_REPORT, "notice");
                pg_log(PG_REPORT, "\n"
                           "Your installation contains extensions that should 
be updated\n"
@@ -204,7 +208,7 @@ report_extension_updates(ClusterInfo *cluster)
                           "    %s\n"
                           "when executed by psql by the database superuser 
will update\n"
                           "these extensions.",
-                          output_path);
+                          report.path);
        }
        else
                check_ok();
-- 
2.39.3 (Apple Git-146)

>From 7a5405cdd97bfb0a4a69167c412ef47b4380ba94 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Sat, 6 Jul 2024 21:06:31 -0500
Subject: [PATCH v10 06/11] Use pg_upgrade's new parallel framework for data
 type checks.

Reviewed-by: FIXME
Discussion: https://postgr.es/m/20240516211638.GA1688936%40nathanxps13
---
 src/bin/pg_upgrade/check.c | 351 ++++++++++++++++++++-----------------
 1 file changed, 191 insertions(+), 160 deletions(-)

diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index f8160e0140..f935b53e1f 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -314,6 +314,147 @@ static DataTypesUsageChecks data_types_usage_checks[] =
        }
 };
 
+/*
+ * Private state for check_for_data_types_usage()'s UpgradeTask.
+ */
+struct data_type_check_state
+{
+       DataTypesUsageChecks *check;    /* the check for this step */
+       bool       *result;                     /* true if check failed for any 
database */
+       PQExpBuffer *report;            /* buffer for report on failed checks */
+};
+
+/*
+ * Returns a palloc'd query string for the data type check, for use by
+ * check_for_data_types_usage()'s UpgradeTask.
+ */
+static char *
+data_type_check_query(int checknum)
+{
+       DataTypesUsageChecks *check = &data_types_usage_checks[checknum];
+
+       return psprintf("WITH RECURSIVE oids AS ( "
+       /* start with the type(s) returned by base_query */
+                                       "       %s "
+                                       "       UNION ALL "
+                                       "       SELECT * FROM ( "
+       /* inner WITH because we can only reference the CTE once */
+                                       "               WITH x AS (SELECT oid 
FROM oids) "
+       /* domains on any type selected so far */
+                                       "                       SELECT t.oid 
FROM pg_catalog.pg_type t, x WHERE typbasetype = x.oid AND typtype = 'd' "
+                                       "                       UNION ALL "
+       /* arrays over any type selected so far */
+                                       "                       SELECT t.oid 
FROM pg_catalog.pg_type t, x WHERE typelem = x.oid AND typtype = 'b' "
+                                       "                       UNION ALL "
+       /* composite types containing any type selected so far */
+                                       "                       SELECT t.oid 
FROM pg_catalog.pg_type t, pg_catalog.pg_class c, pg_catalog.pg_attribute a, x "
+                                       "                       WHERE t.typtype 
= 'c' AND "
+                                       "                                 t.oid 
= c.reltype AND "
+                                       "                                 c.oid 
= a.attrelid AND "
+                                       "                                 NOT 
a.attisdropped AND "
+                                       "                                 
a.atttypid = x.oid "
+                                       "                       UNION ALL "
+       /* ranges containing any type selected so far */
+                                       "                       SELECT t.oid 
FROM pg_catalog.pg_type t, pg_catalog.pg_range r, x "
+                                       "                       WHERE t.typtype 
= 'r' AND r.rngtypid = t.oid AND r.rngsubtype = x.oid"
+                                       "       ) foo "
+                                       ") "
+       /* now look for stored columns of any such type */
+                                       "SELECT n.nspname, c.relname, a.attname 
"
+                                       "FROM   pg_catalog.pg_class c, "
+                                       "               pg_catalog.pg_namespace 
n, "
+                                       "               pg_catalog.pg_attribute 
a "
+                                       "WHERE  c.oid = a.attrelid AND "
+                                       "               NOT a.attisdropped AND "
+                                       "               a.atttypid IN (SELECT 
oid FROM oids) AND "
+                                       "               c.relkind IN ("
+                                       CppAsString2(RELKIND_RELATION) ", "
+                                       CppAsString2(RELKIND_MATVIEW) ", "
+                                       CppAsString2(RELKIND_INDEX) ") AND "
+                                       "               c.relnamespace = n.oid 
AND "
+       /* exclude possible orphaned temp tables */
+                                       "               n.nspname !~ 
'^pg_temp_' AND "
+                                       "               n.nspname !~ 
'^pg_toast_temp_' AND "
+       /* exclude system catalogs, too */
+                                       "               n.nspname NOT IN 
('pg_catalog', 'information_schema')",
+                                       check->base_query);
+}
+
+/*
+ * Callback function for processing results of queries for
+ * check_for_data_types_usage()'s UpgradeTask.  If the query returned any rows
+ * (i.e., the check failed), write the details to the report file.
+ */
+static void
+process_data_type_check(DbInfo *dbinfo, PGresult *res, void *arg)
+{
+       struct data_type_check_state *state = (struct data_type_check_state *) 
arg;
+       int                     ntups = PQntuples(res);
+
+       AssertVariableIsOfType(&process_data_type_check, UpgradeTaskProcessCB);
+
+       if (ntups)
+       {
+               char            output_path[MAXPGPATH];
+               int                     i_nspname;
+               int                     i_relname;
+               int                     i_attname;
+               FILE       *script = NULL;
+               bool            db_used = false;
+
+               snprintf(output_path, sizeof(output_path), "%s/%s",
+                                log_opts.basedir,
+                                state->check->report_filename);
+
+               /*
+                * Make sure we have a buffer to save reports to now that we 
found a
+                * first failing check.
+                */
+               if (*state->report == NULL)
+                       *state->report = createPQExpBuffer();
+
+               /*
+                * If this is the first time we see an error for the check in 
question
+                * then print a status message of the failure.
+                */
+               if (!(*state->result))
+               {
+                       pg_log(PG_REPORT, "    failed check: %s", 
_(state->check->status));
+                       appendPQExpBuffer(*state->report, "\n%s\n%s    %s\n",
+                                                         
_(state->check->report_text),
+                                                         _("A list of the 
problem columns is in the file:"),
+                                                         output_path);
+               }
+               *state->result = true;
+
+               i_nspname = PQfnumber(res, "nspname");
+               i_relname = PQfnumber(res, "relname");
+               i_attname = PQfnumber(res, "attname");
+
+               for (int rowno = 0; rowno < ntups; rowno++)
+               {
+                       if (script == NULL && (script = fopen_priv(output_path, 
"a")) == NULL)
+                               pg_fatal("could not open file \"%s\": %m", 
output_path);
+
+                       if (!db_used)
+                       {
+                               fprintf(script, "In database: %s\n", 
dbinfo->db_name);
+                               db_used = true;
+                       }
+                       fprintf(script, "  %s.%s.%s\n",
+                                       PQgetvalue(res, rowno, i_nspname),
+                                       PQgetvalue(res, rowno, i_relname),
+                                       PQgetvalue(res, rowno, i_attname));
+               }
+
+               if (script)
+               {
+                       fclose(script);
+                       script = NULL;
+               }
+       }
+}
+
 /*
  * check_for_data_types_usage()
  *     Detect whether there are any stored columns depending on given type(s)
@@ -334,13 +475,15 @@ static DataTypesUsageChecks data_types_usage_checks[] =
  * there's no storage involved in a view.
  */
 static void
-check_for_data_types_usage(ClusterInfo *cluster, DataTypesUsageChecks *checks)
+check_for_data_types_usage(ClusterInfo *cluster)
 {
-       bool            found = false;
        bool       *results;
-       PQExpBufferData report;
-       DataTypesUsageChecks *tmp = checks;
+       PQExpBuffer report = NULL;
+       DataTypesUsageChecks *tmp = data_types_usage_checks;
        int                     n_data_types_usage_checks = 0;
+       UpgradeTask *task = upgrade_task_create();
+       char      **queries = NULL;
+       struct data_type_check_state *states;
 
        prep_status("Checking data type usage");
 
@@ -353,175 +496,63 @@ check_for_data_types_usage(ClusterInfo *cluster, 
DataTypesUsageChecks *checks)
 
        /* Prepare an array to store the results of checks in */
        results = pg_malloc0(sizeof(bool) * n_data_types_usage_checks);
+       queries = pg_malloc0(sizeof(char *) * n_data_types_usage_checks);
+       states = pg_malloc0(sizeof(struct data_type_check_state) * 
n_data_types_usage_checks);
 
-       /*
-        * Connect to each database in the cluster and run all defined checks
-        * against that database before trying the next one.
-        */
-       for (int dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
+       for (int i = 0; i < n_data_types_usage_checks; i++)
        {
-               DbInfo     *active_db = &cluster->dbarr.dbs[dbnum];
-               PGconn     *conn = connectToServer(cluster, active_db->db_name);
+               DataTypesUsageChecks *check = &data_types_usage_checks[i];
 
-               for (int checknum = 0; checknum < n_data_types_usage_checks; 
checknum++)
+               if (check->threshold_version == MANUAL_CHECK)
                {
-                       PGresult   *res;
-                       int                     ntups;
-                       int                     i_nspname;
-                       int                     i_relname;
-                       int                     i_attname;
-                       FILE       *script = NULL;
-                       bool            db_used = false;
-                       char            output_path[MAXPGPATH];
-                       DataTypesUsageChecks *cur_check = &checks[checknum];
-
-                       if (cur_check->threshold_version == MANUAL_CHECK)
-                       {
-                               Assert(cur_check->version_hook);
-
-                               /*
-                                * Make sure that the check applies to the 
current cluster
-                                * version and skip if not. If no check hook 
has been defined
-                                * we run the check for all versions.
-                                */
-                               if (!cur_check->version_hook(cluster))
-                                       continue;
-                       }
-                       else if (cur_check->threshold_version != ALL_VERSIONS)
-                       {
-                               if (GET_MAJOR_VERSION(cluster->major_version) > 
cur_check->threshold_version)
-                                       continue;
-                       }
-                       else
-                               Assert(cur_check->threshold_version == 
ALL_VERSIONS);
-
-                       snprintf(output_path, sizeof(output_path), "%s/%s",
-                                        log_opts.basedir,
-                                        cur_check->report_filename);
+                       Assert(check->version_hook);
 
                        /*
-                        * The type(s) of interest might be wrapped in a 
domain, array,
-                        * composite, or range, and these container types can 
be nested
-                        * (to varying extents depending on server version, but 
that's not
-                        * of concern here).  To handle all these cases we need 
a
-                        * recursive CTE.
+                        * Make sure that the check applies to the current 
cluster version
+                        * and skip it if not.
                         */
-                       res = executeQueryOrDie(conn,
-                                                                       "WITH 
RECURSIVE oids AS ( "
-                       /* start with the type(s) returned by base_query */
-                                                                       "       
%s "
-                                                                       "       
UNION ALL "
-                                                                       "       
SELECT * FROM ( "
-                       /* inner WITH because we can only reference the CTE 
once */
-                                                                       "       
        WITH x AS (SELECT oid FROM oids) "
-                       /* domains on any type selected so far */
-                                                                       "       
                SELECT t.oid FROM pg_catalog.pg_type t, x WHERE typbasetype = 
x.oid AND typtype = 'd' "
-                                                                       "       
                UNION ALL "
-                       /* arrays over any type selected so far */
-                                                                       "       
                SELECT t.oid FROM pg_catalog.pg_type t, x WHERE typelem = x.oid 
AND typtype = 'b' "
-                                                                       "       
                UNION ALL "
-                       /* composite types containing any type selected so far 
*/
-                                                                       "       
                SELECT t.oid FROM pg_catalog.pg_type t, pg_catalog.pg_class c, 
pg_catalog.pg_attribute a, x "
-                                                                       "       
                WHERE t.typtype = 'c' AND "
-                                                                       "       
                          t.oid = c.reltype AND "
-                                                                       "       
                          c.oid = a.attrelid AND "
-                                                                       "       
                          NOT a.attisdropped AND "
-                                                                       "       
                          a.atttypid = x.oid "
-                                                                       "       
                UNION ALL "
-                       /* ranges containing any type selected so far */
-                                                                       "       
                SELECT t.oid FROM pg_catalog.pg_type t, pg_catalog.pg_range r, 
x "
-                                                                       "       
                WHERE t.typtype = 'r' AND r.rngtypid = t.oid AND r.rngsubtype = 
x.oid"
-                                                                       "       
) foo "
-                                                                       ") "
-                       /* now look for stored columns of any such type */
-                                                                       "SELECT 
n.nspname, c.relname, a.attname "
-                                                                       "FROM   
pg_catalog.pg_class c, "
-                                                                       "       
        pg_catalog.pg_namespace n, "
-                                                                       "       
        pg_catalog.pg_attribute a "
-                                                                       "WHERE  
c.oid = a.attrelid AND "
-                                                                       "       
        NOT a.attisdropped AND "
-                                                                       "       
        a.atttypid IN (SELECT oid FROM oids) AND "
-                                                                       "       
        c.relkind IN ("
-                                                                       
CppAsString2(RELKIND_RELATION) ", "
-                                                                       
CppAsString2(RELKIND_MATVIEW) ", "
-                                                                       
CppAsString2(RELKIND_INDEX) ") AND "
-                                                                       "       
        c.relnamespace = n.oid AND "
-                       /* exclude possible orphaned temp tables */
-                                                                       "       
        n.nspname !~ '^pg_temp_' AND "
-                                                                       "       
        n.nspname !~ '^pg_toast_temp_' AND "
-                       /* exclude system catalogs, too */
-                                                                       "       
        n.nspname NOT IN ('pg_catalog', 'information_schema')",
-                                                                       
cur_check->base_query);
-
-                       ntups = PQntuples(res);
+                       if (!check->version_hook(cluster))
+                               continue;
+               }
+               else if (check->threshold_version != ALL_VERSIONS)
+               {
+                       if (GET_MAJOR_VERSION(cluster->major_version) > 
check->threshold_version)
+                               continue;
+               }
+               else
+                       Assert(check->threshold_version == ALL_VERSIONS);
 
-                       /*
-                        * The datatype was found, so extract the data and log 
to the
-                        * requested filename. We need to open the file for 
appending
-                        * since the check might have already found the type in 
another
-                        * database earlier in the loop.
-                        */
-                       if (ntups)
-                       {
-                               /*
-                                * Make sure we have a buffer to save reports 
to now that we
-                                * found a first failing check.
-                                */
-                               if (!found)
-                                       initPQExpBuffer(&report);
-                               found = true;
-
-                               /*
-                                * If this is the first time we see an error 
for the check in
-                                * question then print a status message of the 
failure.
-                                */
-                               if (!results[checknum])
-                               {
-                                       pg_log(PG_REPORT, "    failed check: 
%s", _(cur_check->status));
-                                       appendPQExpBuffer(&report, "\n%s\n%s    
%s\n",
-                                                                         
_(cur_check->report_text),
-                                                                         _("A 
list of the problem columns is in the file:"),
-                                                                         
output_path);
-                               }
-                               results[checknum] = true;
-
-                               i_nspname = PQfnumber(res, "nspname");
-                               i_relname = PQfnumber(res, "relname");
-                               i_attname = PQfnumber(res, "attname");
-
-                               for (int rowno = 0; rowno < ntups; rowno++)
-                               {
-                                       if (script == NULL && (script = 
fopen_priv(output_path, "a")) == NULL)
-                                               pg_fatal("could not open file 
\"%s\": %m", output_path);
-
-                                       if (!db_used)
-                                       {
-                                               fprintf(script, "In database: 
%s\n", active_db->db_name);
-                                               db_used = true;
-                                       }
-                                       fprintf(script, "  %s.%s.%s\n",
-                                                       PQgetvalue(res, rowno, 
i_nspname),
-                                                       PQgetvalue(res, rowno, 
i_relname),
-                                                       PQgetvalue(res, rowno, 
i_attname));
-                               }
-
-                               if (script)
-                               {
-                                       fclose(script);
-                                       script = NULL;
-                               }
-                       }
+               queries[i] = data_type_check_query(i);
 
-                       PQclear(res);
-               }
+               states[i].check = check;
+               states[i].result = &results[i];
+               states[i].report = &report;
 
-               PQfinish(conn);
+               upgrade_task_add_step(task, queries[i], process_data_type_check,
+                                                         true, &states[i]);
        }
 
-       if (found)
-               pg_fatal("Data type checks failed: %s", report.data);
+       /*
+        * Connect to each database in the cluster and run all defined checks
+        * against that database before trying the next one.
+        */
+       upgrade_task_run(task, cluster);
+       upgrade_task_free(task);
+
+       if (report)
+       {
+               pg_fatal("Data type checks failed: %s", report->data);
+               destroyPQExpBuffer(report);
+       }
 
        pg_free(results);
+       for (int i = 0; i < n_data_types_usage_checks; i++)
+       {
+               if (queries[i])
+                       pg_free(queries[i]);
+       }
+       pg_free(queries);
+       pg_free(states);
 
        check_ok();
 }
@@ -616,7 +647,7 @@ check_and_dump_old_cluster(void)
                check_old_cluster_subscription_state();
        }
 
-       check_for_data_types_usage(&old_cluster, data_types_usage_checks);
+       check_for_data_types_usage(&old_cluster);
 
        /*
         * PG 14 changed the function signature of encoding conversion 
functions.
-- 
2.39.3 (Apple Git-146)

>From dd239126aa373da9ab86f5cd8b9ec90d4e8fb64a Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Mon, 8 Jul 2024 21:00:20 -0500
Subject: [PATCH v10 07/11] Use pg_upgrade's new parallel framework for isn and
 int8 check.

Reviewed-by: FIXME
Discussion: https://postgr.es/m/20240516211638.GA1688936%40nathanxps13
---
 src/bin/pg_upgrade/check.c | 97 ++++++++++++++++++++------------------
 1 file changed, 50 insertions(+), 47 deletions(-)

diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index f935b53e1f..b8af7e541b 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -1225,6 +1225,39 @@ check_for_prepared_transactions(ClusterInfo *cluster)
        check_ok();
 }
 
+/*
+ * Callback function for processing result of query for
+ * check_for_isn_and_int8_passing_mismatch()'s UpgradeTask.  If the query
+ * returned any rows (i.e., the check failed), write the details to the report
+ * file.
+ */
+static void
+process_isn_and_int8_passing_mismatch(DbInfo *dbinfo, PGresult *res, void *arg)
+{
+       bool            db_used = false;
+       int                     ntups = PQntuples(res);
+       int                     i_nspname = PQfnumber(res, "nspname");
+       int                     i_proname = PQfnumber(res, "proname");
+       UpgradeTaskReport *report = (UpgradeTaskReport *) arg;
+
+       AssertVariableIsOfType(&process_isn_and_int8_passing_mismatch,
+                                                  UpgradeTaskProcessCB);
+
+       for (int rowno = 0; rowno < ntups; rowno++)
+       {
+               if (report->file == NULL &&
+                       (report->file = fopen_priv(report->path, "w")) == NULL)
+                       pg_fatal("could not open file \"%s\": %m", 
report->path);
+               if (!db_used)
+               {
+                       fprintf(report->file, "In database: %s\n", 
dbinfo->db_name);
+                       db_used = true;
+               }
+               fprintf(report->file, "  %s.%s\n",
+                               PQgetvalue(res, rowno, i_nspname),
+                               PQgetvalue(res, rowno, i_proname));
+       }
+}
 
 /*
  *     check_for_isn_and_int8_passing_mismatch()
@@ -1236,9 +1269,13 @@ check_for_prepared_transactions(ClusterInfo *cluster)
 static void
 check_for_isn_and_int8_passing_mismatch(ClusterInfo *cluster)
 {
-       int                     dbnum;
-       FILE       *script = NULL;
-       char            output_path[MAXPGPATH];
+       UpgradeTask *task;
+       UpgradeTaskReport report;
+       const char *query = "SELECT n.nspname, p.proname "
+               "FROM   pg_catalog.pg_proc p, "
+               "       pg_catalog.pg_namespace n "
+               "WHERE  p.pronamespace = n.oid AND "
+               "       p.probin = '$libdir/isn'";
 
        prep_status("Checking for contrib/isn with bigint-passing mismatch");
 
@@ -1250,54 +1287,20 @@ check_for_isn_and_int8_passing_mismatch(ClusterInfo 
*cluster)
                return;
        }
 
-       snprintf(output_path, sizeof(output_path), "%s/%s",
+       report.file = NULL;
+       snprintf(report.path, sizeof(report.path), "%s/%s",
                         log_opts.basedir,
                         "contrib_isn_and_int8_pass_by_value.txt");
 
-       for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
-       {
-               PGresult   *res;
-               bool            db_used = false;
-               int                     ntups;
-               int                     rowno;
-               int                     i_nspname,
-                                       i_proname;
-               DbInfo     *active_db = &cluster->dbarr.dbs[dbnum];
-               PGconn     *conn = connectToServer(cluster, active_db->db_name);
-
-               /* Find any functions coming from contrib/isn */
-               res = executeQueryOrDie(conn,
-                                                               "SELECT 
n.nspname, p.proname "
-                                                               "FROM   
pg_catalog.pg_proc p, "
-                                                               "               
pg_catalog.pg_namespace n "
-                                                               "WHERE  
p.pronamespace = n.oid AND "
-                                                               "               
p.probin = '$libdir/isn'");
-
-               ntups = PQntuples(res);
-               i_nspname = PQfnumber(res, "nspname");
-               i_proname = PQfnumber(res, "proname");
-               for (rowno = 0; rowno < ntups; rowno++)
-               {
-                       if (script == NULL && (script = fopen_priv(output_path, 
"w")) == NULL)
-                               pg_fatal("could not open file \"%s\": %m", 
output_path);
-                       if (!db_used)
-                       {
-                               fprintf(script, "In database: %s\n", 
active_db->db_name);
-                               db_used = true;
-                       }
-                       fprintf(script, "  %s.%s\n",
-                                       PQgetvalue(res, rowno, i_nspname),
-                                       PQgetvalue(res, rowno, i_proname));
-               }
-
-               PQclear(res);
-
-               PQfinish(conn);
-       }
+       task = upgrade_task_create();
+       upgrade_task_add_step(task, query, 
process_isn_and_int8_passing_mismatch,
+                                                 true, &report);
+       upgrade_task_run(task, cluster);
+       upgrade_task_free(task);
 
-       if (script)
+       if (report.file)
        {
-               fclose(script);
+               fclose(report.file);
                pg_log(PG_REPORT, "fatal");
                pg_fatal("Your installation contains \"contrib/isn\" functions 
which rely on the\n"
                                 "bigint data type.  Your old and new clusters 
pass bigint values\n"
@@ -1305,7 +1308,7 @@ check_for_isn_and_int8_passing_mismatch(ClusterInfo 
*cluster)
                                 "manually dump databases in the old cluster 
that use \"contrib/isn\"\n"
                                 "facilities, drop them, perform the upgrade, 
and then restore them.  A\n"
                                 "list of the problem functions is in the 
file:\n"
-                                "    %s", output_path);
+                                "    %s", report.path);
        }
        else
                check_ok();
-- 
2.39.3 (Apple Git-146)

>From ac3190b807bdac244d448f9abc7cc79782349d26 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Wed, 28 Aug 2024 15:10:19 -0500
Subject: [PATCH v10 08/11] Use pg_upgrade's new parallel framework for postfix
 operator check.

Reviewed-by: FIXME
Discussion: https://postgr.es/m/20240516211638.GA1688936%40nathanxps13
---
 src/bin/pg_upgrade/check.c | 146 +++++++++++++++++++------------------
 1 file changed, 75 insertions(+), 71 deletions(-)

diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index b8af7e541b..28c4ddbca3 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -1315,95 +1315,99 @@ check_for_isn_and_int8_passing_mismatch(ClusterInfo 
*cluster)
 }
 
 /*
- * Verify that no user defined postfix operators exist.
+ * Callback function for processing result of query for
+ * check_for_user_defined_postfix_ops()'s UpgradeTask.  If the query returned
+ * any rows (i.e., the check failed), write the details to the report file.
  */
 static void
-check_for_user_defined_postfix_ops(ClusterInfo *cluster)
+process_user_defined_postfix_ops(DbInfo *dbinfo, PGresult *res, void *arg)
 {
-       int                     dbnum;
-       FILE       *script = NULL;
-       char            output_path[MAXPGPATH];
+       UpgradeTaskReport *report = (UpgradeTaskReport *) arg;
+       int                     ntups = PQntuples(res);
+       bool            db_used = false;
+       int                     i_oproid = PQfnumber(res, "oproid");
+       int                     i_oprnsp = PQfnumber(res, "oprnsp");
+       int                     i_oprname = PQfnumber(res, "oprname");
+       int                     i_typnsp = PQfnumber(res, "typnsp");
+       int                     i_typname = PQfnumber(res, "typname");
 
-       prep_status("Checking for user-defined postfix operators");
+       AssertVariableIsOfType(&process_user_defined_postfix_ops,
+                                                  UpgradeTaskProcessCB);
 
-       snprintf(output_path, sizeof(output_path), "%s/%s",
-                        log_opts.basedir,
-                        "postfix_ops.txt");
+       if (!ntups)
+               return;
 
-       /* Find any user defined postfix operators */
-       for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
+       for (int rowno = 0; rowno < ntups; rowno++)
        {
-               PGresult   *res;
-               bool            db_used = false;
-               int                     ntups;
-               int                     rowno;
-               int                     i_oproid,
-                                       i_oprnsp,
-                                       i_oprname,
-                                       i_typnsp,
-                                       i_typname;
-               DbInfo     *active_db = &cluster->dbarr.dbs[dbnum];
-               PGconn     *conn = connectToServer(cluster, active_db->db_name);
-
-               /*
-                * The query below hardcodes FirstNormalObjectId as 16384 
rather than
-                * interpolating that C #define into the query because, if that
-                * #define is ever changed, the cutoff we want to use is the 
value
-                * used by pre-version 14 servers, not that of some future 
version.
-                */
-               res = executeQueryOrDie(conn,
-                                                               "SELECT o.oid 
AS oproid, "
-                                                               "       
n.nspname AS oprnsp, "
-                                                               "       
o.oprname, "
-                                                               "       
tn.nspname AS typnsp, "
-                                                               "       
t.typname "
-                                                               "FROM 
pg_catalog.pg_operator o, "
-                                                               "     
pg_catalog.pg_namespace n, "
-                                                               "     
pg_catalog.pg_type t, "
-                                                               "     
pg_catalog.pg_namespace tn "
-                                                               "WHERE 
o.oprnamespace = n.oid AND "
-                                                               "      
o.oprleft = t.oid AND "
-                                                               "      
t.typnamespace = tn.oid AND "
-                                                               "      
o.oprright = 0 AND "
-                                                               "      o.oid >= 
16384");
-               ntups = PQntuples(res);
-               i_oproid = PQfnumber(res, "oproid");
-               i_oprnsp = PQfnumber(res, "oprnsp");
-               i_oprname = PQfnumber(res, "oprname");
-               i_typnsp = PQfnumber(res, "typnsp");
-               i_typname = PQfnumber(res, "typname");
-               for (rowno = 0; rowno < ntups; rowno++)
+               if (report->file == NULL &&
+                       (report->file = fopen_priv(report->path, "w")) == NULL)
+                       pg_fatal("could not open file \"%s\": %m", 
report->path);
+               if (!db_used)
                {
-                       if (script == NULL &&
-                               (script = fopen_priv(output_path, "w")) == NULL)
-                               pg_fatal("could not open file \"%s\": %m", 
output_path);
-                       if (!db_used)
-                       {
-                               fprintf(script, "In database: %s\n", 
active_db->db_name);
-                               db_used = true;
-                       }
-                       fprintf(script, "  (oid=%s) %s.%s (%s.%s, NONE)\n",
-                                       PQgetvalue(res, rowno, i_oproid),
-                                       PQgetvalue(res, rowno, i_oprnsp),
-                                       PQgetvalue(res, rowno, i_oprname),
-                                       PQgetvalue(res, rowno, i_typnsp),
-                                       PQgetvalue(res, rowno, i_typname));
+                       fprintf(report->file, "In database: %s\n", 
dbinfo->db_name);
+                       db_used = true;
                }
+               fprintf(report->file, "  (oid=%s) %s.%s (%s.%s, NONE)\n",
+                               PQgetvalue(res, rowno, i_oproid),
+                               PQgetvalue(res, rowno, i_oprnsp),
+                               PQgetvalue(res, rowno, i_oprname),
+                               PQgetvalue(res, rowno, i_typnsp),
+                               PQgetvalue(res, rowno, i_typname));
+       }
+}
 
-               PQclear(res);
+/*
+ * Verify that no user defined postfix operators exist.
+ */
+static void
+check_for_user_defined_postfix_ops(ClusterInfo *cluster)
+{
+       UpgradeTaskReport report;
+       UpgradeTask *task = upgrade_task_create();
+       const char *query;
 
-               PQfinish(conn);
-       }
+       /*
+        * The query below hardcodes FirstNormalObjectId as 16384 rather than
+        * interpolating that C #define into the query because, if that #define 
is
+        * ever changed, the cutoff we want to use is the value used by
+        * pre-version 14 servers, not that of some future version.
+        */
+       query = "SELECT o.oid AS oproid, "
+               "       n.nspname AS oprnsp, "
+               "       o.oprname, "
+               "       tn.nspname AS typnsp, "
+               "       t.typname "
+               "FROM pg_catalog.pg_operator o, "
+               "     pg_catalog.pg_namespace n, "
+               "     pg_catalog.pg_type t, "
+               "     pg_catalog.pg_namespace tn "
+               "WHERE o.oprnamespace = n.oid AND "
+               "      o.oprleft = t.oid AND "
+               "      t.typnamespace = tn.oid AND "
+               "      o.oprright = 0 AND "
+               "      o.oid >= 16384";
 
-       if (script)
+       prep_status("Checking for user-defined postfix operators");
+
+       report.file = NULL;
+       snprintf(report.path, sizeof(report.path), "%s/%s",
+                        log_opts.basedir,
+                        "postfix_ops.txt");
+
+       upgrade_task_add_step(task, query, process_user_defined_postfix_ops,
+                                                 true, &report);
+       upgrade_task_run(task, cluster);
+       upgrade_task_free(task);
+
+       if (report.file)
        {
-               fclose(script);
+               fclose(report.file);
                pg_log(PG_REPORT, "fatal");
                pg_fatal("Your installation contains user-defined postfix 
operators, which are not\n"
                                 "supported anymore.  Consider dropping the 
postfix operators and replacing\n"
                                 "them with prefix operators or function 
calls.\n"
                                 "A list of user-defined postfix operators is 
in the file:\n"
-                                "    %s", output_path);
+                                "    %s", report.path);
        }
        else
                check_ok();
-- 
2.39.3 (Apple Git-146)

>From d756f173cf94128c6a4fe11b9a48b90d33631298 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Wed, 28 Aug 2024 15:21:29 -0500
Subject: [PATCH v10 09/11] Use pg_upgrade's new parallel framework for
 polymorphics check.

Reviewed-by: FIXME
Discussion: https://postgr.es/m/20240516211638.GA1688936%40nathanxps13
---
 src/bin/pg_upgrade/check.c | 159 +++++++++++++++++++------------------
 1 file changed, 83 insertions(+), 76 deletions(-)

diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index 28c4ddbca3..92a3aa6a77 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -1413,6 +1413,40 @@ check_for_user_defined_postfix_ops(ClusterInfo *cluster)
                check_ok();
 }
 
+/*
+ * Callback function for processing results of query for
+ * check_for_incompatible_polymorphics()'s UpgradeTask.  If the query returned
+ * any rows (i.e., the check failed), write the details to the report file.
+ */
+static void
+process_incompat_polymorphics(DbInfo *dbinfo, PGresult *res, void *arg)
+{
+       UpgradeTaskReport *report = (UpgradeTaskReport *) arg;
+       bool            db_used = false;
+       int                     ntups = PQntuples(res);
+       int                     i_objkind = PQfnumber(res, "objkind");
+       int                     i_objname = PQfnumber(res, "objname");
+
+       AssertVariableIsOfType(&process_incompat_polymorphics,
+                                                  UpgradeTaskProcessCB);
+
+       for (int rowno = 0; rowno < ntups; rowno++)
+       {
+               if (report->file == NULL &&
+                       (report->file = fopen_priv(report->path, "w")) == NULL)
+                       pg_fatal("could not open file \"%s\": %m", 
report->path);
+               if (!db_used)
+               {
+                       fprintf(report->file, "In database: %s\n", 
dbinfo->db_name);
+                       db_used = true;
+               }
+
+               fprintf(report->file, "  %s: %s\n",
+                               PQgetvalue(res, rowno, i_objkind),
+                               PQgetvalue(res, rowno, i_objname));
+       }
+}
+
 /*
  *     check_for_incompatible_polymorphics()
  *
@@ -1422,14 +1456,15 @@ check_for_user_defined_postfix_ops(ClusterInfo *cluster)
 static void
 check_for_incompatible_polymorphics(ClusterInfo *cluster)
 {
-       PGresult   *res;
-       FILE       *script = NULL;
-       char            output_path[MAXPGPATH];
        PQExpBufferData old_polymorphics;
+       UpgradeTask *task = upgrade_task_create();
+       UpgradeTaskReport report;
+       char       *query;
 
        prep_status("Checking for incompatible polymorphic functions");
 
-       snprintf(output_path, sizeof(output_path), "%s/%s",
+       report.file = NULL;
+       snprintf(report.path, sizeof(report.path), "%s/%s",
                         log_opts.basedir,
                         "incompatible_polymorphics.txt");
 
@@ -1453,80 +1488,51 @@ check_for_incompatible_polymorphics(ClusterInfo 
*cluster)
                                                         ", 
'array_positions(anyarray,anyelement)'"
                                                         ", 
'width_bucket(anyelement,anyarray)'");
 
-       for (int dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
-       {
-               bool            db_used = false;
-               DbInfo     *active_db = &cluster->dbarr.dbs[dbnum];
-               PGconn     *conn = connectToServer(cluster, active_db->db_name);
-               int                     ntups;
-               int                     i_objkind,
-                                       i_objname;
-
-               /*
-                * The query below hardcodes FirstNormalObjectId as 16384 
rather than
-                * interpolating that C #define into the query because, if that
-                * #define is ever changed, the cutoff we want to use is the 
value
-                * used by pre-version 14 servers, not that of some future 
version.
-                */
-               res = executeQueryOrDie(conn,
-               /* Aggregate transition functions */
-                                                               "SELECT 
'aggregate' AS objkind, p.oid::regprocedure::text AS objname "
-                                                               "FROM pg_proc 
AS p "
-                                                               "JOIN 
pg_aggregate AS a ON a.aggfnoid=p.oid "
-                                                               "JOIN pg_proc 
AS transfn ON transfn.oid=a.aggtransfn "
-                                                               "WHERE p.oid >= 
16384 "
-                                                               "AND 
a.aggtransfn = ANY(ARRAY[%s]::regprocedure[]) "
-                                                               "AND 
a.aggtranstype = ANY(ARRAY['anyarray', 'anyelement']::regtype[]) "
-
-               /* Aggregate final functions */
-                                                               "UNION ALL "
-                                                               "SELECT 
'aggregate' AS objkind, p.oid::regprocedure::text AS objname "
-                                                               "FROM pg_proc 
AS p "
-                                                               "JOIN 
pg_aggregate AS a ON a.aggfnoid=p.oid "
-                                                               "JOIN pg_proc 
AS finalfn ON finalfn.oid=a.aggfinalfn "
-                                                               "WHERE p.oid >= 
16384 "
-                                                               "AND 
a.aggfinalfn = ANY(ARRAY[%s]::regprocedure[]) "
-                                                               "AND 
a.aggtranstype = ANY(ARRAY['anyarray', 'anyelement']::regtype[]) "
-
-               /* Operators */
-                                                               "UNION ALL "
-                                                               "SELECT 
'operator' AS objkind, op.oid::regoperator::text AS objname "
-                                                               "FROM 
pg_operator AS op "
-                                                               "WHERE op.oid 
>= 16384 "
-                                                               "AND oprcode = 
ANY(ARRAY[%s]::regprocedure[]) "
-                                                               "AND oprleft = 
ANY(ARRAY['anyarray', 'anyelement']::regtype[]);",
-                                                               
old_polymorphics.data,
-                                                               
old_polymorphics.data,
-                                                               
old_polymorphics.data);
-
-               ntups = PQntuples(res);
-
-               i_objkind = PQfnumber(res, "objkind");
-               i_objname = PQfnumber(res, "objname");
-
-               for (int rowno = 0; rowno < ntups; rowno++)
-               {
-                       if (script == NULL &&
-                               (script = fopen_priv(output_path, "w")) == NULL)
-                               pg_fatal("could not open file \"%s\": %m", 
output_path);
-                       if (!db_used)
-                       {
-                               fprintf(script, "In database: %s\n", 
active_db->db_name);
-                               db_used = true;
-                       }
-
-                       fprintf(script, "  %s: %s\n",
-                                       PQgetvalue(res, rowno, i_objkind),
-                                       PQgetvalue(res, rowno, i_objname));
-               }
+       /*
+        * The query below hardcodes FirstNormalObjectId as 16384 rather than
+        * interpolating that C #define into the query because, if that #define 
is
+        * ever changed, the cutoff we want to use is the value used by
+        * pre-version 14 servers, not that of some future version.
+        */
 
-               PQclear(res);
-               PQfinish(conn);
-       }
+       /* Aggregate transition functions */
+       query = psprintf("SELECT 'aggregate' AS objkind, 
p.oid::regprocedure::text AS objname "
+                                        "FROM pg_proc AS p "
+                                        "JOIN pg_aggregate AS a ON 
a.aggfnoid=p.oid "
+                                        "JOIN pg_proc AS transfn ON 
transfn.oid=a.aggtransfn "
+                                        "WHERE p.oid >= 16384 "
+                                        "AND a.aggtransfn = 
ANY(ARRAY[%s]::regprocedure[]) "
+                                        "AND a.aggtranstype = 
ANY(ARRAY['anyarray', 'anyelement']::regtype[]) "
+
+       /* Aggregate final functions */
+                                        "UNION ALL "
+                                        "SELECT 'aggregate' AS objkind, 
p.oid::regprocedure::text AS objname "
+                                        "FROM pg_proc AS p "
+                                        "JOIN pg_aggregate AS a ON 
a.aggfnoid=p.oid "
+                                        "JOIN pg_proc AS finalfn ON 
finalfn.oid=a.aggfinalfn "
+                                        "WHERE p.oid >= 16384 "
+                                        "AND a.aggfinalfn = 
ANY(ARRAY[%s]::regprocedure[]) "
+                                        "AND a.aggtranstype = 
ANY(ARRAY['anyarray', 'anyelement']::regtype[]) "
+
+       /* Operators */
+                                        "UNION ALL "
+                                        "SELECT 'operator' AS objkind, 
op.oid::regoperator::text AS objname "
+                                        "FROM pg_operator AS op "
+                                        "WHERE op.oid >= 16384 "
+                                        "AND oprcode = 
ANY(ARRAY[%s]::regprocedure[]) "
+                                        "AND oprleft = ANY(ARRAY['anyarray', 
'anyelement']::regtype[]);",
+                                        old_polymorphics.data,
+                                        old_polymorphics.data,
+                                        old_polymorphics.data);
+
+       upgrade_task_add_step(task, query, process_incompat_polymorphics,
+                                                 true, &report);
+       upgrade_task_run(task, cluster);
+       upgrade_task_free(task);
 
-       if (script)
+       if (report.file)
        {
-               fclose(script);
+               fclose(report.file);
                pg_log(PG_REPORT, "fatal");
                pg_fatal("Your installation contains user-defined objects that 
refer to internal\n"
                                 "polymorphic functions with arguments of type 
\"anyarray\" or \"anyelement\".\n"
@@ -1534,12 +1540,13 @@ check_for_incompatible_polymorphics(ClusterInfo 
*cluster)
                                 "afterwards, changing them to refer to the new 
corresponding functions with\n"
                                 "arguments of type \"anycompatiblearray\" and 
\"anycompatible\".\n"
                                 "A list of the problematic objects is in the 
file:\n"
-                                "    %s", output_path);
+                                "    %s", report.path);
        }
        else
                check_ok();
 
        termPQExpBuffer(&old_polymorphics);
+       pg_free(query);
 }
 
 /*
-- 
2.39.3 (Apple Git-146)

>From a6a7710e22d8d879a389de5777f724b45ed2e6f2 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Wed, 28 Aug 2024 15:27:37 -0500
Subject: [PATCH v10 10/11] Use pg_upgrade's new parallel framework for WITH
 OIDS check.

Reviewed-by: FIXME
Discussion: https://postgr.es/m/20240516211638.GA1688936%40nathanxps13
---
 src/bin/pg_upgrade/check.c | 100 +++++++++++++++++++------------------
 1 file changed, 52 insertions(+), 48 deletions(-)

diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index 92a3aa6a77..dff440b29a 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -1550,72 +1550,76 @@ check_for_incompatible_polymorphics(ClusterInfo 
*cluster)
 }
 
 /*
- * Verify that no tables are declared WITH OIDS.
+ * Callback function for processing results of query for
+ * check_for_tables_with_oids()'s UpgradeTask.  If the query returned any rows
+ * (i.e., the check failed), write the details to the report file.
  */
 static void
-check_for_tables_with_oids(ClusterInfo *cluster)
+process_with_oids_check(DbInfo *dbinfo, PGresult *res, void *arg)
 {
-       int                     dbnum;
-       FILE       *script = NULL;
-       char            output_path[MAXPGPATH];
+       UpgradeTaskReport *report = (UpgradeTaskReport *) arg;
+       bool            db_used = false;
+       int                     ntups = PQntuples(res);
+       int                     i_nspname = PQfnumber(res, "nspname");
+       int                     i_relname = PQfnumber(res, "relname");
 
-       prep_status("Checking for tables WITH OIDS");
+       AssertVariableIsOfType(&process_with_oids_check, UpgradeTaskProcessCB);
 
-       snprintf(output_path, sizeof(output_path), "%s/%s",
-                        log_opts.basedir,
-                        "tables_with_oids.txt");
+       if (!ntups)
+               return;
 
-       /* Find any tables declared WITH OIDS */
-       for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
+       for (int rowno = 0; rowno < ntups; rowno++)
        {
-               PGresult   *res;
-               bool            db_used = false;
-               int                     ntups;
-               int                     rowno;
-               int                     i_nspname,
-                                       i_relname;
-               DbInfo     *active_db = &cluster->dbarr.dbs[dbnum];
-               PGconn     *conn = connectToServer(cluster, active_db->db_name);
-
-               res = executeQueryOrDie(conn,
-                                                               "SELECT 
n.nspname, c.relname "
-                                                               "FROM   
pg_catalog.pg_class c, "
-                                                               "               
pg_catalog.pg_namespace n "
-                                                               "WHERE  
c.relnamespace = n.oid AND "
-                                                               "               
c.relhasoids AND"
-                                                               "       
n.nspname NOT IN ('pg_catalog')");
-
-               ntups = PQntuples(res);
-               i_nspname = PQfnumber(res, "nspname");
-               i_relname = PQfnumber(res, "relname");
-               for (rowno = 0; rowno < ntups; rowno++)
+               if (report->file == NULL &&
+                       (report->file = fopen_priv(report->path, "w")) == NULL)
+                       pg_fatal("could not open file \"%s\": %m", 
report->path);
+               if (!db_used)
                {
-                       if (script == NULL && (script = fopen_priv(output_path, 
"w")) == NULL)
-                               pg_fatal("could not open file \"%s\": %m", 
output_path);
-                       if (!db_used)
-                       {
-                               fprintf(script, "In database: %s\n", 
active_db->db_name);
-                               db_used = true;
-                       }
-                       fprintf(script, "  %s.%s\n",
-                                       PQgetvalue(res, rowno, i_nspname),
-                                       PQgetvalue(res, rowno, i_relname));
+                       fprintf(report->file, "In database: %s\n", 
dbinfo->db_name);
+                       db_used = true;
                }
+               fprintf(report->file, "  %s.%s\n",
+                               PQgetvalue(res, rowno, i_nspname),
+                               PQgetvalue(res, rowno, i_relname));
+       }
+}
 
-               PQclear(res);
+/*
+ * Verify that no tables are declared WITH OIDS.
+ */
+static void
+check_for_tables_with_oids(ClusterInfo *cluster)
+{
+       UpgradeTaskReport report;
+       UpgradeTask *task = upgrade_task_create();
+       const char *query = "SELECT n.nspname, c.relname "
+               "FROM   pg_catalog.pg_class c, "
+               "       pg_catalog.pg_namespace n "
+               "WHERE  c.relnamespace = n.oid AND "
+               "       c.relhasoids AND"
+               "       n.nspname NOT IN ('pg_catalog')";
 
-               PQfinish(conn);
-       }
+       prep_status("Checking for tables WITH OIDS");
 
-       if (script)
+       report.file = NULL;
+       snprintf(report.path, sizeof(report.path), "%s/%s",
+                        log_opts.basedir,
+                        "tables_with_oids.txt");
+
+       upgrade_task_add_step(task, query, process_with_oids_check,
+                                                 true, &report);
+       upgrade_task_run(task, cluster);
+       upgrade_task_free(task);
+
+       if (report.file)
        {
-               fclose(script);
+               fclose(report.file);
                pg_log(PG_REPORT, "fatal");
                pg_fatal("Your installation contains tables declared WITH OIDS, 
which is not\n"
                                 "supported anymore.  Consider removing the oid 
column using\n"
                                 "    ALTER TABLE ... SET WITHOUT OIDS;\n"
                                 "A list of tables with the problem is in the 
file:\n"
-                                "    %s", output_path);
+                                "    %s", report.path);
        }
        else
                check_ok();
-- 
2.39.3 (Apple Git-146)

>From 3cd66a2e1bbe3fc7a90c96962c158475f715245d Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Wed, 28 Aug 2024 15:35:31 -0500
Subject: [PATCH v10 11/11] Use pg_upgrade's parallel framework for encoding
 conversion check.

Reviewed-by: FIXME
Discussion: https://postgr.es/m/20240516211638.GA1688936%40nathanxps13
---
 src/bin/pg_upgrade/check.c | 120 ++++++++++++++++++++-----------------
 1 file changed, 64 insertions(+), 56 deletions(-)

diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index dff440b29a..01ab3d0694 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -1684,81 +1684,89 @@ check_for_pg_role_prefix(ClusterInfo *cluster)
 }
 
 /*
- * Verify that no user-defined encoding conversions exist.
+ * Callback function for processing results of query for
+ * check_for_user_defined_encoding_conversions()'s UpgradeTask.  If the query
+ * returned any rows (i.e., the check failed), write the details to the report
+ * file.
  */
 static void
-check_for_user_defined_encoding_conversions(ClusterInfo *cluster)
+process_user_defined_encoding_conversions(DbInfo *dbinfo, PGresult *res, void 
*arg)
 {
-       int                     dbnum;
-       FILE       *script = NULL;
-       char            output_path[MAXPGPATH];
+       UpgradeTaskReport *report = (UpgradeTaskReport *) arg;
+       bool            db_used = false;
+       int                     ntups = PQntuples(res);
+       int                     i_conoid = PQfnumber(res, "conoid");
+       int                     i_conname = PQfnumber(res, "conname");
+       int                     i_nspname = PQfnumber(res, "nspname");
 
-       prep_status("Checking for user-defined encoding conversions");
+       AssertVariableIsOfType(&process_user_defined_encoding_conversions,
+                                                  UpgradeTaskProcessCB);
 
-       snprintf(output_path, sizeof(output_path), "%s/%s",
-                        log_opts.basedir,
-                        "encoding_conversions.txt");
+       if (!ntups)
+               return;
 
-       /* Find any user defined encoding conversions */
-       for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
+       for (int rowno = 0; rowno < ntups; rowno++)
        {
-               PGresult   *res;
-               bool            db_used = false;
-               int                     ntups;
-               int                     rowno;
-               int                     i_conoid,
-                                       i_conname,
-                                       i_nspname;
-               DbInfo     *active_db = &cluster->dbarr.dbs[dbnum];
-               PGconn     *conn = connectToServer(cluster, active_db->db_name);
-
-               /*
-                * The query below hardcodes FirstNormalObjectId as 16384 
rather than
-                * interpolating that C #define into the query because, if that
-                * #define is ever changed, the cutoff we want to use is the 
value
-                * used by pre-version 14 servers, not that of some future 
version.
-                */
-               res = executeQueryOrDie(conn,
-                                                               "SELECT c.oid 
as conoid, c.conname, n.nspname "
-                                                               "FROM 
pg_catalog.pg_conversion c, "
-                                                               "     
pg_catalog.pg_namespace n "
-                                                               "WHERE 
c.connamespace = n.oid AND "
-                                                               "      c.oid >= 
16384");
-               ntups = PQntuples(res);
-               i_conoid = PQfnumber(res, "conoid");
-               i_conname = PQfnumber(res, "conname");
-               i_nspname = PQfnumber(res, "nspname");
-               for (rowno = 0; rowno < ntups; rowno++)
+               if (report->file == NULL &&
+                       (report->file = fopen_priv(report->path, "w")) == NULL)
+                       pg_fatal("could not open file \"%s\": %m", 
report->path);
+               if (!db_used)
                {
-                       if (script == NULL &&
-                               (script = fopen_priv(output_path, "w")) == NULL)
-                               pg_fatal("could not open file \"%s\": %m", 
output_path);
-                       if (!db_used)
-                       {
-                               fprintf(script, "In database: %s\n", 
active_db->db_name);
-                               db_used = true;
-                       }
-                       fprintf(script, "  (oid=%s) %s.%s\n",
-                                       PQgetvalue(res, rowno, i_conoid),
-                                       PQgetvalue(res, rowno, i_nspname),
-                                       PQgetvalue(res, rowno, i_conname));
+                       fprintf(report->file, "In database: %s\n", 
dbinfo->db_name);
+                       db_used = true;
                }
+               fprintf(report->file, "  (oid=%s) %s.%s\n",
+                               PQgetvalue(res, rowno, i_conoid),
+                               PQgetvalue(res, rowno, i_nspname),
+                               PQgetvalue(res, rowno, i_conname));
+       }
+}
 
-               PQclear(res);
+/*
+ * Verify that no user-defined encoding conversions exist.
+ */
+static void
+check_for_user_defined_encoding_conversions(ClusterInfo *cluster)
+{
+       UpgradeTaskReport report;
+       UpgradeTask *task = upgrade_task_create();
+       const char *query;
 
-               PQfinish(conn);
-       }
+       prep_status("Checking for user-defined encoding conversions");
 
-       if (script)
+       report.file = NULL;
+       snprintf(report.path, sizeof(report.path), "%s/%s",
+                        log_opts.basedir,
+                        "encoding_conversions.txt");
+
+       /*
+        * The query below hardcodes FirstNormalObjectId as 16384 rather than
+        * interpolating that C #define into the query because, if that #define 
is
+        * ever changed, the cutoff we want to use is the value used by
+        * pre-version 14 servers, not that of some future version.
+        */
+       query = "SELECT c.oid as conoid, c.conname, n.nspname "
+               "FROM pg_catalog.pg_conversion c, "
+               "     pg_catalog.pg_namespace n "
+               "WHERE c.connamespace = n.oid AND "
+               "      c.oid >= 16384";
+
+       upgrade_task_add_step(task, query,
+                                                 
process_user_defined_encoding_conversions,
+                                                 true, &report);
+       upgrade_task_run(task, cluster);
+       upgrade_task_free(task);
+
+       if (report.file)
        {
-               fclose(script);
+               fclose(report.file);
                pg_log(PG_REPORT, "fatal");
                pg_fatal("Your installation contains user-defined encoding 
conversions.\n"
                                 "The conversion function parameters changed in 
PostgreSQL version 14\n"
                                 "so this cluster cannot currently be upgraded. 
 You can remove the\n"
                                 "encoding conversions in the old cluster and 
restart the upgrade.\n"
                                 "A list of user-defined encoding conversions 
is in the file:\n"
-                                "    %s", output_path);
+                                "    %s", report.path);
        }
        else
                check_ok();
-- 
2.39.3 (Apple Git-146)

Reply via email to