On Fri, Jul 19, 2024 at 04:21:37PM -0500, Nathan Bossart wrote:
> However, while looking into this, I noticed that only one get_query
> callback (get_db_subscription_count()) actually customizes the generated
> query using information in the provided DbInfo.  AFAICT we can do this
> particular step without running a query in each database, as I mentioned
> elsewhere [0].  That should speed things up a bit and allow us to simplify
> the AsyncTask code.
> 
> With that, if we are willing to assume that a given get_query callback will
> generate the same string for all databases (and I think we should), we can
> run the callback once and save the string in the step for dispatch_query()
> to use.  This would look more like what you suggested in the quoted text.

Here is a new patch set.  I've included the latest revision of the patch to
fix get_db_subscription_count() from the other thread [0] as 0001 since I
expect that to be committed soon.  I've also moved the patch that moves the
"live_check" variable to "user_opts" to 0002 since I plan on committing
that sooner than later, too.  Otherwise, I've tried to address all feedback
provided thus far.

[0] https://commitfest.postgresql.org/49/5135/

-- 
nathan
>From 75b0c739f3710034ff8910176460c3bfbeb1716c Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Sat, 20 Jul 2024 21:01:29 -0500
Subject: [PATCH v5 01/13] pg_upgrade: retrieve subscription count more
 efficiently

---
 src/bin/pg_upgrade/check.c      |  9 +++----
 src/bin/pg_upgrade/info.c       | 43 +++++++--------------------------
 src/bin/pg_upgrade/pg_upgrade.h |  3 +--
 3 files changed, 13 insertions(+), 42 deletions(-)

diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index 27924159d6..39d14b7b92 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -1797,17 +1797,14 @@ check_new_cluster_subscription_configuration(void)
 {
        PGresult   *res;
        PGconn     *conn;
-       int                     nsubs_on_old;
        int                     max_replication_slots;
 
        /* Subscriptions and their dependencies can be migrated since PG17. */
        if (GET_MAJOR_VERSION(old_cluster.major_version) < 1700)
                return;
 
-       nsubs_on_old = count_old_cluster_subscriptions();
-
        /* Quick return if there are no subscriptions to be migrated. */
-       if (nsubs_on_old == 0)
+       if (old_cluster.nsubs == 0)
                return;
 
        prep_status("Checking for new cluster configuration for subscriptions");
@@ -1821,10 +1818,10 @@ check_new_cluster_subscription_configuration(void)
                pg_fatal("could not determine parameter settings on new 
cluster");
 
        max_replication_slots = atoi(PQgetvalue(res, 0, 0));
-       if (nsubs_on_old > max_replication_slots)
+       if (old_cluster.nsubs > max_replication_slots)
                pg_fatal("\"max_replication_slots\" (%d) must be greater than 
or equal to the number of "
                                 "subscriptions (%d) on the old cluster",
-                                max_replication_slots, nsubs_on_old);
+                                max_replication_slots, old_cluster.nsubs);
 
        PQclear(res);
        PQfinish(conn);
diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c
index 95c22a7200..e43be79aa5 100644
--- a/src/bin/pg_upgrade/info.c
+++ b/src/bin/pg_upgrade/info.c
@@ -28,7 +28,7 @@ static void print_db_infos(DbInfoArr *db_arr);
 static void print_rel_infos(RelInfoArr *rel_arr);
 static void print_slot_infos(LogicalSlotInfoArr *slot_arr);
 static void get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool 
live_check);
-static void get_db_subscription_count(DbInfo *dbinfo);
+static void get_subscription_count(ClusterInfo *cluster);
 
 
 /*
@@ -293,17 +293,13 @@ get_db_rel_and_slot_infos(ClusterInfo *cluster, bool 
live_check)
 
                get_rel_infos(cluster, pDbInfo);
 
-               /*
-                * Retrieve the logical replication slots infos and the 
subscriptions
-                * count for the old cluster.
-                */
                if (cluster == &old_cluster)
-               {
                        get_old_cluster_logical_slot_infos(pDbInfo, live_check);
-                       get_db_subscription_count(pDbInfo);
-               }
        }
 
+       if (cluster == &old_cluster)
+               get_subscription_count(cluster);
+
        if (cluster == &old_cluster)
                pg_log(PG_VERBOSE, "\nsource databases:");
        else
@@ -750,14 +746,14 @@ count_old_cluster_logical_slots(void)
 /*
  * get_db_subscription_count()
  *
- * Gets the number of subscriptions in the database referred to by "dbinfo".
+ * Gets the number of subscriptions in the cluster.
  *
  * Note: This function will not do anything if the old cluster is pre-PG17.
  * This is because before that the logical slots are not upgraded, so we will
  * not be able to upgrade the logical replication clusters completely.
  */
 static void
-get_db_subscription_count(DbInfo *dbinfo)
+get_subscription_count(ClusterInfo *cluster)
 {
        PGconn     *conn;
        PGresult   *res;
@@ -766,36 +762,15 @@ get_db_subscription_count(DbInfo *dbinfo)
        if (GET_MAJOR_VERSION(old_cluster.major_version) < 1700)
                return;
 
-       conn = connectToServer(&old_cluster, dbinfo->db_name);
+       conn = connectToServer(&old_cluster, "template1");
        res = executeQueryOrDie(conn, "SELECT count(*) "
-                                                       "FROM 
pg_catalog.pg_subscription WHERE subdbid = %u",
-                                                       dbinfo->db_oid);
-       dbinfo->nsubs = atoi(PQgetvalue(res, 0, 0));
+                                                       "FROM 
pg_catalog.pg_subscription");
+       cluster->nsubs = atoi(PQgetvalue(res, 0, 0));
 
        PQclear(res);
        PQfinish(conn);
 }
 
-/*
- * count_old_cluster_subscriptions()
- *
- * Returns the number of subscriptions for all databases.
- *
- * Note: this function always returns 0 if the old_cluster is PG16 and prior
- * because we gather subscriptions only for cluster versions greater than or
- * equal to PG17. See get_db_subscription_count().
- */
-int
-count_old_cluster_subscriptions(void)
-{
-       int                     nsubs = 0;
-
-       for (int dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
-               nsubs += old_cluster.dbarr.dbs[dbnum].nsubs;
-
-       return nsubs;
-}
-
 static void
 free_db_and_rel_infos(DbInfoArr *db_arr)
 {
diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h
index 8afe240bdf..e6dbbe6a93 100644
--- a/src/bin/pg_upgrade/pg_upgrade.h
+++ b/src/bin/pg_upgrade/pg_upgrade.h
@@ -197,7 +197,6 @@ typedef struct
                                                                                
         * path */
        RelInfoArr      rel_arr;                /* array of all user relinfos */
        LogicalSlotInfoArr slot_arr;    /* array of all LogicalSlotInfo */
-       int                     nsubs;                  /* number of 
subscriptions */
 } DbInfo;
 
 /*
@@ -296,6 +295,7 @@ typedef struct
        char            major_version_str[64];  /* string PG_VERSION of cluster 
*/
        uint32          bin_version;    /* version returned from pg_ctl */
        const char *tablespace_suffix;  /* directory specification */
+       int                     nsubs;                  /* number of 
subscriptions */
 } ClusterInfo;
 
 
@@ -430,7 +430,6 @@ FileNameMap *gen_db_file_maps(DbInfo *old_db,
                                                          const char 
*new_pgdata);
 void           get_db_rel_and_slot_infos(ClusterInfo *cluster, bool 
live_check);
 int                    count_old_cluster_logical_slots(void);
-int                    count_old_cluster_subscriptions(void);
 
 /* option.c */
 
-- 
2.39.3 (Apple Git-146)

>From d727dad6968d58464bf8044901badf024ade795e Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Tue, 21 May 2024 16:35:19 -0500
Subject: [PATCH v5 02/13] move live_check variable to user_opts

---
 src/bin/pg_upgrade/check.c       | 32 ++++++++++++++++----------------
 src/bin/pg_upgrade/controldata.c |  5 +++--
 src/bin/pg_upgrade/info.c        | 12 +++++-------
 src/bin/pg_upgrade/option.c      |  4 ++--
 src/bin/pg_upgrade/pg_upgrade.c  | 21 ++++++++++-----------
 src/bin/pg_upgrade/pg_upgrade.h  | 13 +++++++------
 6 files changed, 43 insertions(+), 44 deletions(-)

diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index 39d14b7b92..d8888f5053 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -29,7 +29,7 @@ static void check_for_new_tablespace_dir(void);
 static void check_for_user_defined_encoding_conversions(ClusterInfo *cluster);
 static void check_new_cluster_logical_replication_slots(void);
 static void check_new_cluster_subscription_configuration(void);
-static void check_old_cluster_for_valid_slots(bool live_check);
+static void check_old_cluster_for_valid_slots(void);
 static void check_old_cluster_subscription_state(void);
 
 /*
@@ -555,9 +555,9 @@ fix_path_separator(char *path)
 }
 
 void
-output_check_banner(bool live_check)
+output_check_banner(void)
 {
-       if (user_opts.check && live_check)
+       if (user_opts.live_check)
        {
                pg_log(PG_REPORT,
                           "Performing Consistency Checks on Old Live Server\n"
@@ -573,18 +573,18 @@ output_check_banner(bool live_check)
 
 
 void
-check_and_dump_old_cluster(bool live_check)
+check_and_dump_old_cluster(void)
 {
        /* -- OLD -- */
 
-       if (!live_check)
+       if (!user_opts.live_check)
                start_postmaster(&old_cluster, true);
 
        /*
         * Extract a list of databases, tables, and logical replication slots 
from
         * the old cluster.
         */
-       get_db_rel_and_slot_infos(&old_cluster, live_check);
+       get_db_rel_and_slot_infos(&old_cluster);
 
        init_tablespaces();
 
@@ -605,7 +605,7 @@ check_and_dump_old_cluster(bool live_check)
                 * Logical replication slots can be migrated since PG17. See 
comments
                 * atop get_old_cluster_logical_slot_infos().
                 */
-               check_old_cluster_for_valid_slots(live_check);
+               check_old_cluster_for_valid_slots();
 
                /*
                 * Subscriptions and their dependencies can be migrated since 
PG17.
@@ -652,7 +652,7 @@ check_and_dump_old_cluster(bool live_check)
         */
        if (GET_MAJOR_VERSION(old_cluster.major_version) <= 906)
        {
-               if (user_opts.check)
+               if (user_opts.live_check)
                        old_9_6_invalidate_hash_indexes(&old_cluster, true);
        }
 
@@ -667,7 +667,7 @@ check_and_dump_old_cluster(bool live_check)
        if (!user_opts.check)
                generate_old_dump();
 
-       if (!live_check)
+       if (!user_opts.live_check)
                stop_postmaster(false);
 }
 
@@ -675,7 +675,7 @@ check_and_dump_old_cluster(bool live_check)
 void
 check_new_cluster(void)
 {
-       get_db_rel_and_slot_infos(&new_cluster, false);
+       get_db_rel_and_slot_infos(&new_cluster);
 
        check_new_cluster_is_empty();
 
@@ -826,14 +826,14 @@ check_cluster_versions(void)
 
 
 void
-check_cluster_compatibility(bool live_check)
+check_cluster_compatibility(void)
 {
        /* get/check pg_control data of servers */
-       get_control_data(&old_cluster, live_check);
-       get_control_data(&new_cluster, false);
+       get_control_data(&old_cluster);
+       get_control_data(&new_cluster);
        check_control_data(&old_cluster.controldata, &new_cluster.controldata);
 
-       if (live_check && old_cluster.port == new_cluster.port)
+       if (user_opts.live_check && old_cluster.port == new_cluster.port)
                pg_fatal("When checking a live server, "
                                 "the old and new port numbers must be 
different.");
 }
@@ -1836,7 +1836,7 @@ check_new_cluster_subscription_configuration(void)
  * before shutdown.
  */
 static void
-check_old_cluster_for_valid_slots(bool live_check)
+check_old_cluster_for_valid_slots(void)
 {
        char            output_path[MAXPGPATH];
        FILE       *script = NULL;
@@ -1875,7 +1875,7 @@ check_old_cluster_for_valid_slots(bool live_check)
                         * Note: This can be satisfied only when the old 
cluster has been
                         * shut down, so we skip this for live checks.
                         */
-                       if (!live_check && !slot->caught_up)
+                       if (!user_opts.live_check && !slot->caught_up)
                        {
                                if (script == NULL &&
                                        (script = fopen_priv(output_path, "w")) 
== NULL)
diff --git a/src/bin/pg_upgrade/controldata.c b/src/bin/pg_upgrade/controldata.c
index 1f0ccea3ed..cf665b9dee 100644
--- a/src/bin/pg_upgrade/controldata.c
+++ b/src/bin/pg_upgrade/controldata.c
@@ -33,7 +33,7 @@
  * return valid xid data for a running server.
  */
 void
-get_control_data(ClusterInfo *cluster, bool live_check)
+get_control_data(ClusterInfo *cluster)
 {
        char            cmd[MAXPGPATH];
        char            bufin[MAX_STRING];
@@ -76,6 +76,7 @@ get_control_data(ClusterInfo *cluster, bool live_check)
        uint32          segno = 0;
        char       *resetwal_bin;
        int                     rc;
+       bool            live_check = (cluster == &old_cluster && 
user_opts.live_check);
 
        /*
         * Because we test the pg_resetwal output as strings, it has to be in
@@ -118,7 +119,7 @@ get_control_data(ClusterInfo *cluster, bool live_check)
        /*
         * Check for clean shutdown
         */
-       if (!live_check || cluster == &new_cluster)
+       if (!live_check)
        {
                /* only pg_controldata outputs the cluster state */
                snprintf(cmd, sizeof(cmd), "\"%s/pg_controldata\" \"%s\"",
diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c
index e43be79aa5..fc94de9ea4 100644
--- a/src/bin/pg_upgrade/info.c
+++ b/src/bin/pg_upgrade/info.c
@@ -27,7 +27,7 @@ static void free_rel_infos(RelInfoArr *rel_arr);
 static void print_db_infos(DbInfoArr *db_arr);
 static void print_rel_infos(RelInfoArr *rel_arr);
 static void print_slot_infos(LogicalSlotInfoArr *slot_arr);
-static void get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool 
live_check);
+static void get_old_cluster_logical_slot_infos(DbInfo *dbinfo);
 static void get_subscription_count(ClusterInfo *cluster);
 
 
@@ -273,11 +273,9 @@ report_unmatched_relation(const RelInfo *rel, const DbInfo 
*db, bool is_new_db)
  *
  * higher level routine to generate dbinfos for the database running
  * on the given "port". Assumes that server is already running.
- *
- * live_check would be used only when the target is the old cluster.
  */
 void
-get_db_rel_and_slot_infos(ClusterInfo *cluster, bool live_check)
+get_db_rel_and_slot_infos(ClusterInfo *cluster)
 {
        int                     dbnum;
 
@@ -294,7 +292,7 @@ get_db_rel_and_slot_infos(ClusterInfo *cluster, bool 
live_check)
                get_rel_infos(cluster, pDbInfo);
 
                if (cluster == &old_cluster)
-                       get_old_cluster_logical_slot_infos(pDbInfo, live_check);
+                       get_old_cluster_logical_slot_infos(pDbInfo);
        }
 
        if (cluster == &old_cluster)
@@ -641,7 +639,7 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
  * are included.
  */
 static void
-get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check)
+get_old_cluster_logical_slot_infos(DbInfo *dbinfo)
 {
        PGconn     *conn;
        PGresult   *res;
@@ -677,7 +675,7 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool 
live_check)
                                                        "WHERE slot_type = 
'logical' AND "
                                                        "database = 
current_database() AND "
                                                        "temporary IS FALSE;",
-                                                       live_check ? "FALSE" :
+                                                       user_opts.live_check ? 
"FALSE" :
                                                        "(CASE WHEN 
invalidation_reason IS NOT NULL THEN FALSE "
                                                        "ELSE (SELECT 
pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name)) "
                                                        "END)");
diff --git a/src/bin/pg_upgrade/option.c b/src/bin/pg_upgrade/option.c
index 548ea4e623..6f41d63eed 100644
--- a/src/bin/pg_upgrade/option.c
+++ b/src/bin/pg_upgrade/option.c
@@ -470,10 +470,10 @@ adjust_data_dir(ClusterInfo *cluster)
  * directory.
  */
 void
-get_sock_dir(ClusterInfo *cluster, bool live_check)
+get_sock_dir(ClusterInfo *cluster)
 {
 #if !defined(WIN32)
-       if (!live_check)
+       if (!user_opts.live_check || cluster == &new_cluster)
                cluster->sockdir = user_opts.socketdir;
        else
        {
diff --git a/src/bin/pg_upgrade/pg_upgrade.c b/src/bin/pg_upgrade/pg_upgrade.c
index 03eb738fd7..99f3d4543e 100644
--- a/src/bin/pg_upgrade/pg_upgrade.c
+++ b/src/bin/pg_upgrade/pg_upgrade.c
@@ -65,7 +65,7 @@ static void create_new_objects(void);
 static void copy_xact_xlog_xid(void);
 static void set_frozenxids(bool minmxid_only);
 static void make_outputdirs(char *pgdata);
-static void setup(char *argv0, bool *live_check);
+static void setup(char *argv0);
 static void create_logical_replication_slots(void);
 
 ClusterInfo old_cluster,
@@ -88,7 +88,6 @@ int
 main(int argc, char **argv)
 {
        char       *deletion_script_file_name = NULL;
-       bool            live_check = false;
 
        /*
         * pg_upgrade doesn't currently use common/logging.c, but initialize it
@@ -123,18 +122,18 @@ main(int argc, char **argv)
         */
        make_outputdirs(new_cluster.pgdata);
 
-       setup(argv[0], &live_check);
+       setup(argv[0]);
 
-       output_check_banner(live_check);
+       output_check_banner();
 
        check_cluster_versions();
 
-       get_sock_dir(&old_cluster, live_check);
-       get_sock_dir(&new_cluster, false);
+       get_sock_dir(&old_cluster);
+       get_sock_dir(&new_cluster);
 
-       check_cluster_compatibility(live_check);
+       check_cluster_compatibility();
 
-       check_and_dump_old_cluster(live_check);
+       check_and_dump_old_cluster();
 
 
        /* -- NEW -- */
@@ -331,7 +330,7 @@ make_outputdirs(char *pgdata)
 
 
 static void
-setup(char *argv0, bool *live_check)
+setup(char *argv0)
 {
        /*
         * make sure the user has a clean environment, otherwise, we may confuse
@@ -378,7 +377,7 @@ setup(char *argv0, bool *live_check)
                                pg_fatal("There seems to be a postmaster 
servicing the old cluster.\n"
                                                 "Please shutdown that 
postmaster and try again.");
                        else
-                               *live_check = true;
+                               user_opts.live_check = true;
                }
        }
 
@@ -660,7 +659,7 @@ create_new_objects(void)
                set_frozenxids(true);
 
        /* update new_cluster info now that we have objects in the databases */
-       get_db_rel_and_slot_infos(&new_cluster, false);
+       get_db_rel_and_slot_infos(&new_cluster);
 }
 
 /*
diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h
index e6dbbe6a93..08b2ad26d7 100644
--- a/src/bin/pg_upgrade/pg_upgrade.h
+++ b/src/bin/pg_upgrade/pg_upgrade.h
@@ -322,6 +322,7 @@ typedef struct
 typedef struct
 {
        bool            check;                  /* check clusters only, don't 
change any data */
+       bool            live_check;             /* check clusters only, old 
server is running */
        bool            do_sync;                /* flush changes to disk */
        transferMode transfer_mode; /* copy files or link them? */
        int                     jobs;                   /* number of 
processes/threads to use */
@@ -366,20 +367,20 @@ extern OSInfo os_info;
 
 /* check.c */
 
-void           output_check_banner(bool live_check);
-void           check_and_dump_old_cluster(bool live_check);
+void           output_check_banner(void);
+void           check_and_dump_old_cluster(void);
 void           check_new_cluster(void);
 void           report_clusters_compatible(void);
 void           issue_warnings_and_set_wal_level(void);
 void           output_completion_banner(char *deletion_script_file_name);
 void           check_cluster_versions(void);
-void           check_cluster_compatibility(bool live_check);
+void           check_cluster_compatibility(void);
 void           create_script_for_old_cluster_deletion(char 
**deletion_script_file_name);
 
 
 /* controldata.c */
 
-void           get_control_data(ClusterInfo *cluster, bool live_check);
+void           get_control_data(ClusterInfo *cluster);
 void           check_control_data(ControlData *oldctrl, ControlData *newctrl);
 void           disable_old_cluster(void);
 
@@ -428,14 +429,14 @@ void              check_loadable_libraries(void);
 FileNameMap *gen_db_file_maps(DbInfo *old_db,
                                                          DbInfo *new_db, int 
*nmaps, const char *old_pgdata,
                                                          const char 
*new_pgdata);
-void           get_db_rel_and_slot_infos(ClusterInfo *cluster, bool 
live_check);
+void           get_db_rel_and_slot_infos(ClusterInfo *cluster);
 int                    count_old_cluster_logical_slots(void);
 
 /* option.c */
 
 void           parseCommandLine(int argc, char *argv[]);
 void           adjust_data_dir(ClusterInfo *cluster);
-void           get_sock_dir(ClusterInfo *cluster, bool live_check);
+void           get_sock_dir(ClusterInfo *cluster);
 
 /* relfilenumber.c */
 
-- 
2.39.3 (Apple Git-146)

>From c955d1d1d769f8864a43cf98dc9dbacf5e6c7f9c Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Fri, 28 Jun 2024 11:02:44 -0500
Subject: [PATCH v5 03/13] introduce framework for parallelizing pg_upgrade
 tasks

---
 src/bin/pg_upgrade/Makefile      |   1 +
 src/bin/pg_upgrade/async.c       | 327 +++++++++++++++++++++++++++++++
 src/bin/pg_upgrade/meson.build   |   1 +
 src/bin/pg_upgrade/pg_upgrade.h  |  16 ++
 src/tools/pgindent/typedefs.list |   4 +
 5 files changed, 349 insertions(+)
 create mode 100644 src/bin/pg_upgrade/async.c

diff --git a/src/bin/pg_upgrade/Makefile b/src/bin/pg_upgrade/Makefile
index bde91e2beb..3bc4f5d740 100644
--- a/src/bin/pg_upgrade/Makefile
+++ b/src/bin/pg_upgrade/Makefile
@@ -12,6 +12,7 @@ include $(top_builddir)/src/Makefile.global
 
 OBJS = \
        $(WIN32RES) \
+       async.o \
        check.o \
        controldata.o \
        dump.o \
diff --git a/src/bin/pg_upgrade/async.c b/src/bin/pg_upgrade/async.c
new file mode 100644
index 0000000000..742e9df37c
--- /dev/null
+++ b/src/bin/pg_upgrade/async.c
@@ -0,0 +1,327 @@
+/*
+ * async.c
+ *
+ * parallelization via libpq's async APIs
+ *
+ * Copyright (c) 2024, PostgreSQL Global Development Group
+ * src/bin/pg_upgrade/async.c
+ */
+
+#include "postgres_fe.h"
+
+#include "common/connect.h"
+#include "fe_utils/string_utils.h"
+#include "pg_upgrade.h"
+
+static int     dbs_complete;
+static int     dbs_processing;
+
+typedef struct AsyncTaskCallbacks
+{
+       AsyncTaskGetQueryCB query_cb;
+       AsyncTaskProcessCB process_cb;
+       char       *query;
+       bool            free_result;
+       void       *arg;
+} AsyncTaskCallbacks;
+
+typedef struct AsyncTask
+{
+       AsyncTaskCallbacks *cbs;
+       int                     num_cb_sets;
+} AsyncTask;
+
+typedef enum
+{
+       FREE,
+       CONNECTING,
+       SETTING_SEARCH_PATH,
+       RUNNING_QUERY,
+} AsyncSlotState;
+
+typedef struct
+{
+       AsyncSlotState state;
+       int                     db;
+       int                     query;
+       PGconn     *conn;
+} AsyncSlot;
+
+AsyncTask *
+async_task_create(void)
+{
+       return pg_malloc0(sizeof(AsyncTask));
+}
+
+void
+async_task_free(AsyncTask *task)
+{
+       for (int i = 0; i < task->num_cb_sets; i++)
+       {
+               if (task->cbs[i].query)
+                       pg_free(task->cbs[i].query);
+       }
+
+       if (task->cbs)
+               pg_free(task->cbs);
+
+       pg_free(task);
+}
+
+void
+async_task_add_step(AsyncTask *task,
+                                       AsyncTaskGetQueryCB query_cb,
+                                       AsyncTaskProcessCB process_cb, bool 
free_result,
+                                       void *arg)
+{
+       AsyncTaskCallbacks *new_cbs;
+
+       task->cbs = pg_realloc(task->cbs,
+                                                  ++task->num_cb_sets * 
sizeof(AsyncTaskCallbacks));
+
+       new_cbs = &task->cbs[task->num_cb_sets - 1];
+       new_cbs->query_cb = query_cb;
+       new_cbs->process_cb = process_cb;
+       new_cbs->query = NULL;
+       new_cbs->free_result = free_result;
+       new_cbs->arg = arg;
+}
+
+static void
+conn_failure(PGconn *conn)
+{
+       pg_fatal("connection failure: %s", PQerrorMessage(conn));
+}
+
+static void
+start_conn(const ClusterInfo *cluster, AsyncSlot *slot)
+{
+       PQExpBufferData conn_opts;
+
+       /* Build connection string with proper quoting */
+       initPQExpBuffer(&conn_opts);
+       appendPQExpBufferStr(&conn_opts, "dbname=");
+       appendConnStrVal(&conn_opts, cluster->dbarr.dbs[slot->db].db_name);
+       appendPQExpBufferStr(&conn_opts, " user=");
+       appendConnStrVal(&conn_opts, os_info.user);
+       appendPQExpBuffer(&conn_opts, " port=%d", cluster->port);
+       if (cluster->sockdir)
+       {
+               appendPQExpBufferStr(&conn_opts, " host=");
+               appendConnStrVal(&conn_opts, cluster->sockdir);
+       }
+
+       slot->conn = PQconnectStart(conn_opts.data);
+       termPQExpBuffer(&conn_opts);
+
+       if (!slot->conn)
+               conn_failure(slot->conn);
+}
+
+static void
+dispatch_query(const ClusterInfo *cluster, AsyncSlot *slot,
+                          const AsyncTask *task)
+{
+       AsyncTaskCallbacks *cbs = &task->cbs[slot->query];
+
+       if (!cbs->query)
+               cbs->query = cbs->query_cb(cbs->arg);
+
+       if (!PQsendQuery(slot->conn, cbs->query))
+               conn_failure(slot->conn);
+}
+
+static PGresult *
+get_last_result(PGconn *conn)
+{
+       PGresult   *tmp;
+       PGresult   *res = NULL;
+
+       while ((tmp = PQgetResult(conn)) != NULL)
+       {
+               PQclear(res);
+               res = tmp;
+               if (PQstatus(conn) == CONNECTION_BAD)
+                       conn_failure(conn);
+       }
+
+       if (PQresultStatus(res) != PGRES_COMMAND_OK &&
+               PQresultStatus(res) != PGRES_TUPLES_OK)
+               conn_failure(conn);
+
+       return res;
+}
+
+static void
+process_query_result(const ClusterInfo *cluster, AsyncSlot *slot,
+                                        const AsyncTask *task)
+{
+       AsyncTaskCallbacks *cbs = &task->cbs[slot->query];
+       AsyncTaskProcessCB process_cb = cbs->process_cb;
+       DbInfo     *dbinfo = &cluster->dbarr.dbs[slot->db];
+       PGresult   *res = get_last_result(slot->conn);
+
+       (*process_cb) (dbinfo, res, cbs->arg);
+
+       if (cbs->free_result)
+               PQclear(res);
+}
+
+static void
+process_slot(const ClusterInfo *cluster, AsyncSlot *slot, const AsyncTask 
*task)
+{
+       switch (slot->state)
+       {
+               case FREE:
+                       if (dbs_processing >= cluster->dbarr.ndbs)
+                               return;
+                       slot->db = dbs_processing++;
+                       slot->state = CONNECTING;
+                       start_conn(cluster, slot);
+                       return;
+
+               case CONNECTING:
+                       if (PQconnectPoll(slot->conn) == PGRES_POLLING_FAILED)
+                               conn_failure(slot->conn);
+                       if (PQconnectPoll(slot->conn) != PGRES_POLLING_OK)
+                               return;
+                       slot->state = SETTING_SEARCH_PATH;
+                       if (!PQsendQuery(slot->conn, 
ALWAYS_SECURE_SEARCH_PATH_SQL))
+                               conn_failure(slot->conn);
+                       return;
+
+               case SETTING_SEARCH_PATH:
+                       if (!PQconsumeInput(slot->conn))
+                               conn_failure(slot->conn);
+                       if (PQisBusy(slot->conn))
+                               return;
+                       PQclear(get_last_result(slot->conn));
+                       slot->state = RUNNING_QUERY;
+                       dispatch_query(cluster, slot, task);
+                       return;
+
+               case RUNNING_QUERY:
+                       if (!PQconsumeInput(slot->conn))
+                               conn_failure(slot->conn);
+                       if (PQisBusy(slot->conn))
+                               return;
+                       process_query_result(cluster, slot, task);
+                       if (++slot->query >= task->num_cb_sets)
+                       {
+                               dbs_complete++;
+                               PQfinish(slot->conn);
+                               memset(slot, 0, sizeof(AsyncSlot));
+                               return;
+                       }
+                       dispatch_query(cluster, slot, task);
+                       return;
+       }
+}
+
+/*
+ * Wait on the slots to either finish connecting or to receive query results if
+ * possible.  This avoids a tight loop in async_task_run().
+ */
+static void
+wait_on_slots(AsyncSlot *slots, int numslots)
+{
+       fd_set          input_mask;
+       fd_set          output_mask;
+       fd_set          except_mask;
+       int                     maxFd = 0;
+
+       FD_ZERO(&input_mask);
+       FD_ZERO(&output_mask);
+       FD_ZERO(&except_mask);
+
+       for (int i = 0; i < numslots; i++)
+       {
+               int                     sock;
+               bool            read = false;
+
+               switch (slots[i].state)
+               {
+                       case FREE:
+
+                               /*
+                                * If we see a free slot, return right away so 
that it can be
+                                * reused immediately for the next database.  
This might cause
+                                * us to spin more than necessary as we finish 
processing the
+                                * last few databases, but that shouldn't cause 
too much harm.
+                                */
+                               return;
+
+                       case CONNECTING:
+
+                               /*
+                                * If we are waiting for the connection to 
establish, choose
+                                * whether to wait for reading or for writing 
on the socket as
+                                * appropriate.  If neither apply, just return 
immediately so
+                                * that we can handle the slot.
+                                */
+                               {
+                                       PostgresPollingStatusType status;
+
+                                       status = PQconnectPoll(slots[i].conn);
+                                       if (status == PGRES_POLLING_READING)
+                                               read = true;
+                                       else if (status != 
PGRES_POLLING_WRITING)
+                                               return;
+                               }
+                               break;
+
+                       case SETTING_SEARCH_PATH:
+                       case RUNNING_QUERY:
+
+                               /*
+                                * If we've sent a query, we must wait for the 
socket to be
+                                * read-ready.  Note that process_slot() 
handles calling
+                                * PQconsumeInput() as required.
+                                */
+                               read = true;
+                               break;
+               }
+
+               /*
+                * If there's some problem retrieving the socket, just pretend 
this
+                * slot doesn't exist.  We don't expect this to happen 
regularly in
+                * practice, so it seems unlikely to cause too much harm.
+                */
+               sock = PQsocket(slots[i].conn);
+               if (sock < 0)
+                       continue;
+
+               /*
+                * Add the socket to the set.
+                */
+               FD_SET(sock, read ? &input_mask : &output_mask);
+               FD_SET(sock, &except_mask);
+               maxFd = Max(maxFd, sock);
+       }
+
+       /*
+        * If we found socket(s) to wait on, wait.
+        */
+       if (maxFd != 0)
+               (void) select(maxFd + 1, &input_mask, &output_mask, 
&except_mask, NULL);
+}
+
+void
+async_task_run(const AsyncTask *task, const ClusterInfo *cluster)
+{
+       int                     jobs = Max(1, user_opts.jobs);
+       AsyncSlot  *slots = pg_malloc0(sizeof(AsyncSlot) * jobs);
+
+       dbs_complete = 0;
+       dbs_processing = 0;
+
+       while (dbs_complete < cluster->dbarr.ndbs)
+       {
+               for (int i = 0; i < jobs; i++)
+                       process_slot(cluster, &slots[i], task);
+
+               wait_on_slots(slots, jobs);
+       }
+
+       pg_free(slots);
+}
diff --git a/src/bin/pg_upgrade/meson.build b/src/bin/pg_upgrade/meson.build
index 9825fa3305..9eb48e176c 100644
--- a/src/bin/pg_upgrade/meson.build
+++ b/src/bin/pg_upgrade/meson.build
@@ -1,6 +1,7 @@
 # Copyright (c) 2022-2024, PostgreSQL Global Development Group
 
 pg_upgrade_sources = files(
+  'async.c',
   'check.c',
   'controldata.c',
   'dump.c',
diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h
index 08b2ad26d7..dea418ec3f 100644
--- a/src/bin/pg_upgrade/pg_upgrade.h
+++ b/src/bin/pg_upgrade/pg_upgrade.h
@@ -494,3 +494,19 @@ void               parallel_transfer_all_new_dbs(DbInfoArr 
*old_db_arr, DbInfoArr *new_db_arr
                                                                                
  char *old_pgdata, char *new_pgdata,
                                                                                
  char *old_tablespace);
 bool           reap_child(bool wait_for_child);
+
+/* async.c */
+
+typedef char *(*AsyncTaskGetQueryCB) (void *arg);
+typedef void (*AsyncTaskProcessCB) (DbInfo *dbinfo, PGresult *res, void *arg);
+
+/* struct definition is private to async.c */
+typedef struct AsyncTask AsyncTask;
+
+AsyncTask  *async_task_create(void);
+void           async_task_add_step(AsyncTask *task,
+                                                               
AsyncTaskGetQueryCB query_cb,
+                                                               
AsyncTaskProcessCB process_cb, bool free_result,
+                                                               void *arg);
+void           async_task_run(const AsyncTask *task, const ClusterInfo 
*cluster);
+void           async_task_free(AsyncTask *task);
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index b4d7f9217c..a2330f4346 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -153,6 +153,10 @@ ArrayMetaState
 ArraySubWorkspace
 ArrayToken
 ArrayType
+AsyncSlot
+AsyncSlotState
+AsyncTask
+AsyncTaskCallbacks
 AsyncQueueControl
 AsyncQueueEntry
 AsyncRequest
-- 
2.39.3 (Apple Git-146)

>From 917a33304f1eb20065351075f487d2deb4f8e976 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Fri, 28 Jun 2024 17:21:19 -0500
Subject: [PATCH v5 04/13] use new pg_upgrade async API for subscription state
 checks

---
 src/bin/pg_upgrade/check.c | 204 ++++++++++++++++++++-----------------
 1 file changed, 110 insertions(+), 94 deletions(-)

diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index d8888f5053..41a23e7efe 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -1903,6 +1903,79 @@ check_old_cluster_for_valid_slots(void)
        check_ok();
 }
 
+/* private state for subscription state checks */
+struct substate_info
+{
+       FILE       *script;
+       char            output_path[MAXPGPATH];
+};
+
+/*
+ * We don't allow upgrade if there is a risk of dangling slot or origin
+ * corresponding to initial sync after upgrade.
+ *
+ * A slot/origin not created yet refers to the 'i' (initialize) state, while
+ * 'r' (ready) state refers to a slot/origin created previously but already
+ * dropped. These states are supported for pg_upgrade. The other states listed
+ * below are not supported:
+ *
+ * a) SUBREL_STATE_DATASYNC: A relation upgraded while in this state would
+ * retain a replication slot, which could not be dropped by the sync worker
+ * spawned after the upgrade because the subscription ID used for the slot name
+ * won't match anymore.
+ *
+ * b) SUBREL_STATE_SYNCDONE: A relation upgraded while in this state would
+ * retain the replication origin when there is a failure in tablesync worker
+ * immediately after dropping the replication slot in the publisher.
+ *
+ * c) SUBREL_STATE_FINISHEDCOPY: A tablesync worker spawned to work on a
+ * relation upgraded while in this state would expect an origin ID with the OID
+ * of the subscription used before the upgrade, causing it to fail.
+ *
+ * d) SUBREL_STATE_SYNCWAIT, SUBREL_STATE_CATCHUP and SUBREL_STATE_UNKNOWN:
+ * These states are not stored in the catalog, so we need not allow these
+ * states.
+ */
+static char *
+sub_query(void *arg)
+{
+       return pg_strdup("SELECT r.srsubstate, s.subname, n.nspname, c.relname "
+                                        "FROM pg_catalog.pg_subscription_rel r 
"
+                                        "LEFT JOIN pg_catalog.pg_subscription 
s"
+                                        "   ON r.srsubid = s.oid "
+                                        "LEFT JOIN pg_catalog.pg_class c"
+                                        "   ON r.srrelid = c.oid "
+                                        "LEFT JOIN pg_catalog.pg_namespace n"
+                                        "   ON c.relnamespace = n.oid "
+                                        "WHERE r.srsubstate NOT IN ('i', 'r') "
+                                        "ORDER BY s.subname");
+}
+
+static void
+sub_process(DbInfo *dbinfo, PGresult *res, void *arg)
+{
+       struct substate_info *state = (struct substate_info *) arg;
+       int                     ntup = PQntuples(res);
+       int                     i_srsubstate = PQfnumber(res, "srsubstate");
+       int                     i_subname = PQfnumber(res, "subname");
+       int                     i_nspname = PQfnumber(res, "nspname");
+       int                     i_relname = PQfnumber(res, "relname");
+
+       for (int i = 0; i < ntup; i++)
+       {
+               if (state->script == NULL &&
+                       (state->script = fopen_priv(state->output_path, "w")) 
== NULL)
+                       pg_fatal("could not open file \"%s\": %m", 
state->output_path);
+
+               fprintf(state->script, "The table sync state \"%s\" is not 
allowed for database:\"%s\" subscription:\"%s\" schema:\"%s\" 
relation:\"%s\"\n",
+                               PQgetvalue(res, i, i_srsubstate),
+                               dbinfo->db_name,
+                               PQgetvalue(res, i, i_subname),
+                               PQgetvalue(res, i, i_nspname),
+                               PQgetvalue(res, i, i_relname));
+       }
+}
+
 /*
  * check_old_cluster_subscription_state()
  *
@@ -1913,115 +1986,58 @@ check_old_cluster_for_valid_slots(void)
 static void
 check_old_cluster_subscription_state(void)
 {
-       FILE       *script = NULL;
-       char            output_path[MAXPGPATH];
+       AsyncTask  *task = async_task_create();
+       struct substate_info state;
+       PGresult   *res;
+       PGconn     *conn;
        int                     ntup;
 
        prep_status("Checking for subscription state");
 
-       snprintf(output_path, sizeof(output_path), "%s/%s",
+       state.script = NULL;
+       snprintf(state.output_path, sizeof(state.output_path), "%s/%s",
                         log_opts.basedir,
                         "subs_invalid.txt");
-       for (int dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
-       {
-               PGresult   *res;
-               DbInfo     *active_db = &old_cluster.dbarr.dbs[dbnum];
-               PGconn     *conn = connectToServer(&old_cluster, 
active_db->db_name);
-
-               /* We need to check for pg_replication_origin only once. */
-               if (dbnum == 0)
-               {
-                       /*
-                        * Check that all the subscriptions have their 
respective
-                        * replication origin.
-                        */
-                       res = executeQueryOrDie(conn,
-                                                                       "SELECT 
d.datname, s.subname "
-                                                                       "FROM 
pg_catalog.pg_subscription s "
-                                                                       "LEFT 
OUTER JOIN pg_catalog.pg_replication_origin o "
-                                                                       "       
ON o.roname = 'pg_' || s.oid "
-                                                                       "INNER 
JOIN pg_catalog.pg_database d "
-                                                                       "       
ON d.oid = s.subdbid "
-                                                                       "WHERE 
o.roname IS NULL;");
-
-                       ntup = PQntuples(res);
-                       for (int i = 0; i < ntup; i++)
-                       {
-                               if (script == NULL && (script = 
fopen_priv(output_path, "w")) == NULL)
-                                       pg_fatal("could not open file \"%s\": 
%m", output_path);
-                               fprintf(script, "The replication origin is 
missing for database:\"%s\" subscription:\"%s\"\n",
-                                               PQgetvalue(res, i, 0),
-                                               PQgetvalue(res, i, 1));
-                       }
-                       PQclear(res);
-               }
 
-               /*
-                * We don't allow upgrade if there is a risk of dangling slot or
-                * origin corresponding to initial sync after upgrade.
-                *
-                * A slot/origin not created yet refers to the 'i' (initialize) 
state,
-                * while 'r' (ready) state refers to a slot/origin created 
previously
-                * but already dropped. These states are supported for 
pg_upgrade. The
-                * other states listed below are not supported:
-                *
-                * a) SUBREL_STATE_DATASYNC: A relation upgraded while in this 
state
-                * would retain a replication slot, which could not be dropped 
by the
-                * sync worker spawned after the upgrade because the 
subscription ID
-                * used for the slot name won't match anymore.
-                *
-                * b) SUBREL_STATE_SYNCDONE: A relation upgraded while in this 
state
-                * would retain the replication origin when there is a failure 
in
-                * tablesync worker immediately after dropping the replication 
slot in
-                * the publisher.
-                *
-                * c) SUBREL_STATE_FINISHEDCOPY: A tablesync worker spawned to 
work on
-                * a relation upgraded while in this state would expect an 
origin ID
-                * with the OID of the subscription used before the upgrade, 
causing
-                * it to fail.
-                *
-                * d) SUBREL_STATE_SYNCWAIT, SUBREL_STATE_CATCHUP and
-                * SUBREL_STATE_UNKNOWN: These states are not stored in the 
catalog,
-                * so we need not allow these states.
-                */
-               res = executeQueryOrDie(conn,
-                                                               "SELECT 
r.srsubstate, s.subname, n.nspname, c.relname "
-                                                               "FROM 
pg_catalog.pg_subscription_rel r "
-                                                               "LEFT JOIN 
pg_catalog.pg_subscription s"
-                                                               "       ON 
r.srsubid = s.oid "
-                                                               "LEFT JOIN 
pg_catalog.pg_class c"
-                                                               "       ON 
r.srrelid = c.oid "
-                                                               "LEFT JOIN 
pg_catalog.pg_namespace n"
-                                                               "       ON 
c.relnamespace = n.oid "
-                                                               "WHERE 
r.srsubstate NOT IN ('i', 'r') "
-                                                               "ORDER BY 
s.subname");
-
-               ntup = PQntuples(res);
-               for (int i = 0; i < ntup; i++)
-               {
-                       if (script == NULL && (script = fopen_priv(output_path, 
"w")) == NULL)
-                               pg_fatal("could not open file \"%s\": %m", 
output_path);
+       /*
+        * Check that all the subscriptions have their respective replication
+        * origin.  This check only needs to run once.
+        */
+       conn = connectToServer(&old_cluster, old_cluster.dbarr.dbs[0].db_name);
+       res = executeQueryOrDie(conn,
+                                                       "SELECT d.datname, 
s.subname "
+                                                       "FROM 
pg_catalog.pg_subscription s "
+                                                       "LEFT OUTER JOIN 
pg_catalog.pg_replication_origin o "
+                                                       "       ON o.roname = 
'pg_' || s.oid "
+                                                       "INNER JOIN 
pg_catalog.pg_database d "
+                                                       "       ON d.oid = 
s.subdbid "
+                                                       "WHERE o.roname IS 
NULL;");
+       ntup = PQntuples(res);
+       for (int i = 0; i < ntup; i++)
+       {
+               if (state.script == NULL &&
+                       (state.script = fopen_priv(state.output_path, "w")) == 
NULL)
+                       pg_fatal("could not open file \"%s\": %m", 
state.output_path);
+               fprintf(state.script, "The replication origin is missing for 
database:\"%s\" subscription:\"%s\"\n",
+                               PQgetvalue(res, i, 0),
+                               PQgetvalue(res, i, 1));
+       }
+       PQclear(res);
+       PQfinish(conn);
 
-                       fprintf(script, "The table sync state \"%s\" is not 
allowed for database:\"%s\" subscription:\"%s\" schema:\"%s\" 
relation:\"%s\"\n",
-                                       PQgetvalue(res, i, 0),
-                                       active_db->db_name,
-                                       PQgetvalue(res, i, 1),
-                                       PQgetvalue(res, i, 2),
-                                       PQgetvalue(res, i, 3));
-               }
+       async_task_add_step(task, sub_query, sub_process, true, &state);
 
-               PQclear(res);
-               PQfinish(conn);
-       }
+       async_task_run(task, &old_cluster);
+       async_task_free(task);
 
-       if (script)
+       if (state.script)
        {
-               fclose(script);
+               fclose(state.script);
                pg_log(PG_REPORT, "fatal");
                pg_fatal("Your installation contains subscriptions without 
origin or having relations not in i (initialize) or r (ready) state.\n"
                                 "You can allow the initial sync to finish for 
all relations and then restart the upgrade.\n"
                                 "A list of the problematic subscriptions is in 
the file:\n"
-                                "    %s", output_path);
+                                "    %s", state.output_path);
        }
        else
                check_ok();
-- 
2.39.3 (Apple Git-146)

>From e62e03300dc2e207c9e0e22098fea6d93c9a909e Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Fri, 28 Jun 2024 21:09:33 -0500
Subject: [PATCH v5 05/13] use new pg_upgrade async API for retrieving relinfos

---
 src/bin/pg_upgrade/info.c | 238 ++++++++++++++++++--------------------
 1 file changed, 110 insertions(+), 128 deletions(-)

diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c
index fc94de9ea4..798d96ef43 100644
--- a/src/bin/pg_upgrade/info.c
+++ b/src/bin/pg_upgrade/info.c
@@ -11,6 +11,7 @@
 
 #include "access/transam.h"
 #include "catalog/pg_class_d.h"
+#include "pqexpbuffer.h"
 #include "pg_upgrade.h"
 
 static void create_rel_filename_map(const char *old_data, const char *new_data,
@@ -22,13 +23,15 @@ static void report_unmatched_relation(const RelInfo *rel, 
const DbInfo *db,
 static void free_db_and_rel_infos(DbInfoArr *db_arr);
 static void get_template0_info(ClusterInfo *cluster);
 static void get_db_infos(ClusterInfo *cluster);
-static void get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo);
+static char *get_rel_infos_query(void *arg);
+static void get_rel_infos_result(DbInfo *dbinfo, PGresult *res, void *arg);
 static void free_rel_infos(RelInfoArr *rel_arr);
 static void print_db_infos(DbInfoArr *db_arr);
 static void print_rel_infos(RelInfoArr *rel_arr);
 static void print_slot_infos(LogicalSlotInfoArr *slot_arr);
-static void get_old_cluster_logical_slot_infos(DbInfo *dbinfo);
 static void get_subscription_count(ClusterInfo *cluster);
+static char *get_old_cluster_logical_slot_infos_query(void *arg);
+static void get_old_cluster_logical_slot_infos_result(DbInfo *dbinfo, PGresult 
*res, void *arg);
 
 
 /*
@@ -277,7 +280,7 @@ report_unmatched_relation(const RelInfo *rel, const DbInfo 
*db, bool is_new_db)
 void
 get_db_rel_and_slot_infos(ClusterInfo *cluster)
 {
-       int                     dbnum;
+       AsyncTask  *task = async_task_create();
 
        if (cluster->dbarr.dbs != NULL)
                free_db_and_rel_infos(&cluster->dbarr);
@@ -285,15 +288,19 @@ get_db_rel_and_slot_infos(ClusterInfo *cluster)
        get_template0_info(cluster);
        get_db_infos(cluster);
 
-       for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
-       {
-               DbInfo     *pDbInfo = &cluster->dbarr.dbs[dbnum];
-
-               get_rel_infos(cluster, pDbInfo);
+       async_task_add_step(task,
+                                               get_rel_infos_query,
+                                               get_rel_infos_result,
+                                               true, NULL);
+       if (cluster == &old_cluster &&
+               GET_MAJOR_VERSION(cluster->major_version) > 1600)
+               async_task_add_step(task,
+                                                       
get_old_cluster_logical_slot_infos_query,
+                                                       
get_old_cluster_logical_slot_infos_result,
+                                                       true, cluster);
 
-               if (cluster == &old_cluster)
-                       get_old_cluster_logical_slot_infos(pDbInfo);
-       }
+       async_task_run(task, cluster);
+       async_task_free(task);
 
        if (cluster == &old_cluster)
                get_subscription_count(cluster);
@@ -443,32 +450,12 @@ get_db_infos(ClusterInfo *cluster)
  * Note: the resulting RelInfo array is assumed to be sorted by OID.
  * This allows later processing to match up old and new databases efficiently.
  */
-static void
-get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
+static char *
+get_rel_infos_query(void *arg)
 {
-       PGconn     *conn = connectToServer(cluster,
-                                                                          
dbinfo->db_name);
-       PGresult   *res;
-       RelInfo    *relinfos;
-       int                     ntups;
-       int                     relnum;
-       int                     num_rels = 0;
-       char       *nspname = NULL;
-       char       *relname = NULL;
-       char       *tablespace = NULL;
-       int                     i_spclocation,
-                               i_nspname,
-                               i_relname,
-                               i_reloid,
-                               i_indtable,
-                               i_toastheap,
-                               i_relfilenumber,
-                               i_reltablespace;
-       char            query[QUERY_ALLOC];
-       char       *last_namespace = NULL,
-                          *last_tablespace = NULL;
+       PQExpBufferData query;
 
-       query[0] = '\0';                        /* initialize query string to 
empty */
+       initPQExpBuffer(&query);
 
        /*
         * Create a CTE that collects OIDs of regular user tables and matviews,
@@ -480,34 +467,34 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
         * output, so we have to copy that system table.  It's easiest to do 
that
         * by treating it as a user table.
         */
-       snprintf(query + strlen(query), sizeof(query) - strlen(query),
-                        "WITH regular_heap (reloid, indtable, toastheap) AS ( "
-                        "  SELECT c.oid, 0::oid, 0::oid "
-                        "  FROM pg_catalog.pg_class c JOIN 
pg_catalog.pg_namespace n "
-                        "         ON c.relnamespace = n.oid "
-                        "  WHERE relkind IN (" CppAsString2(RELKIND_RELATION) 
", "
-                        CppAsString2(RELKIND_MATVIEW) ") AND "
+       appendPQExpBuffer(&query,
+                                         "WITH regular_heap (reloid, indtable, 
toastheap) AS ( "
+                                         "  SELECT c.oid, 0::oid, 0::oid "
+                                         "  FROM pg_catalog.pg_class c JOIN 
pg_catalog.pg_namespace n "
+                                         "         ON c.relnamespace = n.oid "
+                                         "  WHERE relkind IN (" 
CppAsString2(RELKIND_RELATION) ", "
+                                         CppAsString2(RELKIND_MATVIEW) ") AND "
        /* exclude possible orphaned temp tables */
-                        "    ((n.nspname !~ '^pg_temp_' AND "
-                        "      n.nspname !~ '^pg_toast_temp_' AND "
-                        "      n.nspname NOT IN ('pg_catalog', 
'information_schema', "
-                        "                        'binary_upgrade', 'pg_toast') 
AND "
-                        "      c.oid >= %u::pg_catalog.oid) OR "
-                        "     (n.nspname = 'pg_catalog' AND "
-                        "      relname IN ('pg_largeobject') ))), ",
-                        FirstNormalObjectId);
+                                         "    ((n.nspname !~ '^pg_temp_' AND "
+                                         "      n.nspname !~ '^pg_toast_temp_' 
AND "
+                                         "      n.nspname NOT IN 
('pg_catalog', 'information_schema', "
+                                         "                        
'binary_upgrade', 'pg_toast') AND "
+                                         "      c.oid >= %u::pg_catalog.oid) 
OR "
+                                         "     (n.nspname = 'pg_catalog' AND "
+                                         "      relname IN ('pg_largeobject') 
))), ",
+                                         FirstNormalObjectId);
 
        /*
         * Add a CTE that collects OIDs of toast tables belonging to the tables
         * selected by the regular_heap CTE.  (We have to do this separately
         * because the namespace-name rules above don't work for toast tables.)
         */
-       snprintf(query + strlen(query), sizeof(query) - strlen(query),
-                        "  toast_heap (reloid, indtable, toastheap) AS ( "
-                        "  SELECT c.reltoastrelid, 0::oid, c.oid "
-                        "  FROM regular_heap JOIN pg_catalog.pg_class c "
-                        "      ON regular_heap.reloid = c.oid "
-                        "  WHERE c.reltoastrelid != 0), ");
+       appendPQExpBufferStr(&query,
+                                                "  toast_heap (reloid, 
indtable, toastheap) AS ( "
+                                                "  SELECT c.reltoastrelid, 
0::oid, c.oid "
+                                                "  FROM regular_heap JOIN 
pg_catalog.pg_class c "
+                                                "      ON regular_heap.reloid 
= c.oid "
+                                                "  WHERE c.reltoastrelid != 
0), ");
 
        /*
         * Add a CTE that collects OIDs of all valid indexes on the previously
@@ -515,53 +502,61 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
         * Testing indisready is necessary in 9.2, and harmless in earlier/later
         * versions.
         */
-       snprintf(query + strlen(query), sizeof(query) - strlen(query),
-                        "  all_index (reloid, indtable, toastheap) AS ( "
-                        "  SELECT indexrelid, indrelid, 0::oid "
-                        "  FROM pg_catalog.pg_index "
-                        "  WHERE indisvalid AND indisready "
-                        "    AND indrelid IN "
-                        "        (SELECT reloid FROM regular_heap "
-                        "         UNION ALL "
-                        "         SELECT reloid FROM toast_heap)) ");
+       appendPQExpBufferStr(&query,
+                                                "  all_index (reloid, 
indtable, toastheap) AS ( "
+                                                "  SELECT indexrelid, 
indrelid, 0::oid "
+                                                "  FROM pg_catalog.pg_index "
+                                                "  WHERE indisvalid AND 
indisready "
+                                                "    AND indrelid IN "
+                                                "        (SELECT reloid FROM 
regular_heap "
+                                                "         UNION ALL "
+                                                "         SELECT reloid FROM 
toast_heap)) ");
 
        /*
         * And now we can write the query that retrieves the data we want for 
each
         * heap and index relation.  Make sure result is sorted by OID.
         */
-       snprintf(query + strlen(query), sizeof(query) - strlen(query),
-                        "SELECT all_rels.*, n.nspname, c.relname, "
-                        "  c.relfilenode, c.reltablespace, "
-                        "  pg_catalog.pg_tablespace_location(t.oid) AS 
spclocation "
-                        "FROM (SELECT * FROM regular_heap "
-                        "      UNION ALL "
-                        "      SELECT * FROM toast_heap "
-                        "      UNION ALL "
-                        "      SELECT * FROM all_index) all_rels "
-                        "  JOIN pg_catalog.pg_class c "
-                        "      ON all_rels.reloid = c.oid "
-                        "  JOIN pg_catalog.pg_namespace n "
-                        "     ON c.relnamespace = n.oid "
-                        "  LEFT OUTER JOIN pg_catalog.pg_tablespace t "
-                        "     ON c.reltablespace = t.oid "
-                        "ORDER BY 1;");
-
-       res = executeQueryOrDie(conn, "%s", query);
-
-       ntups = PQntuples(res);
-
-       relinfos = (RelInfo *) pg_malloc(sizeof(RelInfo) * ntups);
+       appendPQExpBufferStr(&query,
+                                                "SELECT all_rels.*, n.nspname, 
c.relname, "
+                                                "  c.relfilenode, 
c.reltablespace, "
+                                                "  
pg_catalog.pg_tablespace_location(t.oid) AS spclocation "
+                                                "FROM (SELECT * FROM 
regular_heap "
+                                                "      UNION ALL "
+                                                "      SELECT * FROM 
toast_heap "
+                                                "      UNION ALL "
+                                                "      SELECT * FROM 
all_index) all_rels "
+                                                "  JOIN pg_catalog.pg_class c "
+                                                "      ON all_rels.reloid = 
c.oid "
+                                                "  JOIN 
pg_catalog.pg_namespace n "
+                                                "     ON c.relnamespace = 
n.oid "
+                                                "  LEFT OUTER JOIN 
pg_catalog.pg_tablespace t "
+                                                "     ON c.reltablespace = 
t.oid "
+                                                "ORDER BY 1;");
+
+       return query.data;
+}
 
-       i_reloid = PQfnumber(res, "reloid");
-       i_indtable = PQfnumber(res, "indtable");
-       i_toastheap = PQfnumber(res, "toastheap");
-       i_nspname = PQfnumber(res, "nspname");
-       i_relname = PQfnumber(res, "relname");
-       i_relfilenumber = PQfnumber(res, "relfilenode");
-       i_reltablespace = PQfnumber(res, "reltablespace");
-       i_spclocation = PQfnumber(res, "spclocation");
+static void
+get_rel_infos_result(DbInfo *dbinfo, PGresult *res, void *arg)
+{
+       int                     ntups = PQntuples(res);
+       RelInfo    *relinfos = (RelInfo *) pg_malloc(sizeof(RelInfo) * ntups);
+       int                     i_reloid = PQfnumber(res, "reloid");
+       int                     i_indtable = PQfnumber(res, "indtable");
+       int                     i_toastheap = PQfnumber(res, "toastheap");
+       int                     i_nspname = PQfnumber(res, "nspname");
+       int                     i_relname = PQfnumber(res, "relname");
+       int                     i_relfilenumber = PQfnumber(res, "relfilenode");
+       int                     i_reltablespace = PQfnumber(res, 
"reltablespace");
+       int                     i_spclocation = PQfnumber(res, "spclocation");
+       int                     num_rels = 0;
+       char       *nspname = NULL;
+       char       *relname = NULL;
+       char       *tablespace = NULL;
+       char       *last_namespace = NULL;
+       char       *last_tablespace = NULL;
 
-       for (relnum = 0; relnum < ntups; relnum++)
+       for (int relnum = 0; relnum < ntups; relnum++)
        {
                RelInfo    *curr = &relinfos[num_rels++];
 
@@ -614,9 +609,6 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
                        /* A zero reltablespace oid indicates the database 
tablespace. */
                        curr->tablespace = dbinfo->db_tablespace;
        }
-       PQclear(res);
-
-       PQfinish(conn);
 
        dbinfo->rel_arr.rels = relinfos;
        dbinfo->rel_arr.nrels = num_rels;
@@ -638,20 +630,9 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
  * check_old_cluster_for_valid_slots() would raise a FATAL error if such slots
  * are included.
  */
-static void
-get_old_cluster_logical_slot_infos(DbInfo *dbinfo)
+static char *
+get_old_cluster_logical_slot_infos_query(void *arg)
 {
-       PGconn     *conn;
-       PGresult   *res;
-       LogicalSlotInfo *slotinfos = NULL;
-       int                     num_slots;
-
-       /* Logical slots can be migrated since PG17. */
-       if (GET_MAJOR_VERSION(old_cluster.major_version) <= 1600)
-               return;
-
-       conn = connectToServer(&old_cluster, dbinfo->db_name);
-
        /*
         * Fetch the logical replication slot information. The check whether the
         * slot is considered caught up is done by an upgrade function. This
@@ -669,18 +650,23 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo)
         * started and stopped several times causing any temporary slots to be
         * removed.
         */
-       res = executeQueryOrDie(conn, "SELECT slot_name, plugin, two_phase, 
failover, "
-                                                       "%s as caught_up, 
invalidation_reason IS NOT NULL as invalid "
-                                                       "FROM 
pg_catalog.pg_replication_slots "
-                                                       "WHERE slot_type = 
'logical' AND "
-                                                       "database = 
current_database() AND "
-                                                       "temporary IS FALSE;",
-                                                       user_opts.live_check ? 
"FALSE" :
-                                                       "(CASE WHEN 
invalidation_reason IS NOT NULL THEN FALSE "
-                                                       "ELSE (SELECT 
pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name)) "
-                                                       "END)");
-
-       num_slots = PQntuples(res);
+       return psprintf("SELECT slot_name, plugin, two_phase, failover, "
+                                       "%s as caught_up, invalidation_reason 
IS NOT NULL as invalid "
+                                       "FROM pg_catalog.pg_replication_slots "
+                                       "WHERE slot_type = 'logical' AND "
+                                       "database = current_database() AND "
+                                       "temporary IS FALSE;",
+                                       user_opts.live_check ? "FALSE" :
+                                       "(CASE WHEN invalidation_reason IS NOT 
NULL THEN FALSE "
+                                       "ELSE (SELECT 
pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name)) "
+                                       "END)");
+}
+
+static void
+get_old_cluster_logical_slot_infos_result(DbInfo *dbinfo, PGresult *res, void 
*arg)
+{
+       LogicalSlotInfo *slotinfos = NULL;
+       int                     num_slots = PQntuples(res);
 
        if (num_slots)
        {
@@ -713,14 +699,10 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo)
                }
        }
 
-       PQclear(res);
-       PQfinish(conn);
-
        dbinfo->slot_arr.slots = slotinfos;
        dbinfo->slot_arr.nslots = num_slots;
 }
 
-
 /*
  * count_old_cluster_logical_slots()
  *
-- 
2.39.3 (Apple Git-146)

>From 1c822873b79171acb4ab50bf61d7a592b73dac09 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Fri, 28 Jun 2024 21:24:35 -0500
Subject: [PATCH v5 06/13] use new pg_upgrade async API to parallelize getting
 loadable libraries

---
 src/bin/pg_upgrade/function.c | 63 ++++++++++++++++++++---------------
 1 file changed, 37 insertions(+), 26 deletions(-)

diff --git a/src/bin/pg_upgrade/function.c b/src/bin/pg_upgrade/function.c
index 7e3abed098..da3717e71c 100644
--- a/src/bin/pg_upgrade/function.c
+++ b/src/bin/pg_upgrade/function.c
@@ -42,6 +42,32 @@ library_name_compare(const void *p1, const void *p2)
                                          ((const LibraryInfo *) p2)->dbnum);
 }
 
+struct loadable_libraries_state
+{
+       PGresult  **ress;
+       int                     totaltups;
+};
+
+static char *
+get_loadable_libraries_query(void *arg)
+{
+       return psprintf("SELECT DISTINCT probin "
+                                       "FROM pg_catalog.pg_proc "
+                                       "WHERE prolang = %u AND "
+                                       "probin IS NOT NULL AND "
+                                       "oid >= %u;",
+                                       ClanguageId,
+                                       FirstNormalObjectId);
+}
+
+static void
+get_loadable_libraries_result(DbInfo *dbinfo, PGresult *res, void *arg)
+{
+       struct loadable_libraries_state *state = (struct 
loadable_libraries_state *) arg;
+
+       state->ress[dbinfo - old_cluster.dbarr.dbs] = res;
+       state->totaltups += PQntuples(res);
+}
 
 /*
  * get_loadable_libraries()
@@ -54,47 +80,32 @@ library_name_compare(const void *p1, const void *p2)
 void
 get_loadable_libraries(void)
 {
-       PGresult  **ress;
        int                     totaltups;
        int                     dbnum;
        int                     n_libinfos;
+       AsyncTask  *task = async_task_create();
+       struct loadable_libraries_state state;
 
-       ress = (PGresult **) pg_malloc(old_cluster.dbarr.ndbs * sizeof(PGresult 
*));
-       totaltups = 0;
+       state.ress = (PGresult **) pg_malloc(old_cluster.dbarr.ndbs * 
sizeof(PGresult *));
+       state.totaltups = 0;
 
-       /* Fetch all library names, removing duplicates within each DB */
-       for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
-       {
-               DbInfo     *active_db = &old_cluster.dbarr.dbs[dbnum];
-               PGconn     *conn = connectToServer(&old_cluster, 
active_db->db_name);
+       async_task_add_step(task, get_loadable_libraries_query,
+                                               get_loadable_libraries_result, 
false, &state);
 
-               /*
-                * Fetch all libraries containing non-built-in C functions in 
this DB.
-                */
-               ress[dbnum] = executeQueryOrDie(conn,
-                                                                               
"SELECT DISTINCT probin "
-                                                                               
"FROM pg_catalog.pg_proc "
-                                                                               
"WHERE prolang = %u AND "
-                                                                               
"probin IS NOT NULL AND "
-                                                                               
"oid >= %u;",
-                                                                               
ClanguageId,
-                                                                               
FirstNormalObjectId);
-               totaltups += PQntuples(ress[dbnum]);
-
-               PQfinish(conn);
-       }
+       async_task_run(task, &old_cluster);
+       async_task_free(task);
 
        /*
         * Allocate memory for required libraries and logical replication output
         * plugins.
         */
-       n_libinfos = totaltups + count_old_cluster_logical_slots();
+       n_libinfos = state.totaltups + count_old_cluster_logical_slots();
        os_info.libraries = (LibraryInfo *) pg_malloc(sizeof(LibraryInfo) * 
n_libinfos);
        totaltups = 0;
 
        for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
        {
-               PGresult   *res = ress[dbnum];
+               PGresult   *res = state.ress[dbnum];
                int                     ntups;
                int                     rowno;
                LogicalSlotInfoArr *slot_arr = 
&old_cluster.dbarr.dbs[dbnum].slot_arr;
@@ -129,7 +140,7 @@ get_loadable_libraries(void)
                }
        }
 
-       pg_free(ress);
+       pg_free(state.ress);
 
        os_info.num_libraries = totaltups;
 }
-- 
2.39.3 (Apple Git-146)

>From 274965e786fbaf369f01eb7ea24319b220580c7d Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Fri, 28 Jun 2024 21:31:57 -0500
Subject: [PATCH v5 07/13] use new pg_upgrade async API to parallelize
 reporting extension updates

---
 src/bin/pg_upgrade/version.c | 82 ++++++++++++++++++------------------
 1 file changed, 41 insertions(+), 41 deletions(-)

diff --git a/src/bin/pg_upgrade/version.c b/src/bin/pg_upgrade/version.c
index 2de6dffccd..bfcf08c936 100644
--- a/src/bin/pg_upgrade/version.c
+++ b/src/bin/pg_upgrade/version.c
@@ -139,6 +139,42 @@ old_9_6_invalidate_hash_indexes(ClusterInfo *cluster, bool 
check_mode)
                check_ok();
 }
 
+static char *
+report_extension_updates_query(void *arg)
+{
+       return pg_strdup("SELECT name "
+                                        "FROM pg_available_extensions "
+                                        "WHERE installed_version != 
default_version");
+}
+
+static void
+report_extension_updates_result(DbInfo *dbinfo, PGresult *res, void *arg)
+{
+       bool            db_used = false;
+       int                     ntups = PQntuples(res);
+       int                     i_name = PQfnumber(res, "name");
+       char       *output_path = "update_extensions.sql";
+       FILE      **script = (FILE **) arg;
+
+       for (int rowno = 0; rowno < ntups; rowno++)
+       {
+               if (*script == NULL && (*script = fopen_priv(output_path, "w")) 
== NULL)
+                       pg_fatal("could not open file \"%s\": %m", output_path);
+               if (!db_used)
+               {
+                       PQExpBufferData connectbuf;
+
+                       initPQExpBuffer(&connectbuf);
+                       appendPsqlMetaConnect(&connectbuf, dbinfo->db_name);
+                       fputs(connectbuf.data, *script);
+                       termPQExpBuffer(&connectbuf);
+                       db_used = true;
+               }
+               fprintf(*script, "ALTER EXTENSION %s UPDATE;\n",
+                               quote_identifier(PQgetvalue(res, rowno, 
i_name)));
+       }
+}
+
 /*
  * report_extension_updates()
  *     Report extensions that should be updated.
@@ -146,53 +182,17 @@ old_9_6_invalidate_hash_indexes(ClusterInfo *cluster, 
bool check_mode)
 void
 report_extension_updates(ClusterInfo *cluster)
 {
-       int                     dbnum;
        FILE       *script = NULL;
        char       *output_path = "update_extensions.sql";
+       AsyncTask  *task = async_task_create();
 
        prep_status("Checking for extension updates");
 
-       for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
-       {
-               PGresult   *res;
-               bool            db_used = false;
-               int                     ntups;
-               int                     rowno;
-               int                     i_name;
-               DbInfo     *active_db = &cluster->dbarr.dbs[dbnum];
-               PGconn     *conn = connectToServer(cluster, active_db->db_name);
-
-               /* find extensions needing updates */
-               res = executeQueryOrDie(conn,
-                                                               "SELECT name "
-                                                               "FROM 
pg_available_extensions "
-                                                               "WHERE 
installed_version != default_version"
-                       );
-
-               ntups = PQntuples(res);
-               i_name = PQfnumber(res, "name");
-               for (rowno = 0; rowno < ntups; rowno++)
-               {
-                       if (script == NULL && (script = fopen_priv(output_path, 
"w")) == NULL)
-                               pg_fatal("could not open file \"%s\": %m", 
output_path);
-                       if (!db_used)
-                       {
-                               PQExpBufferData connectbuf;
-
-                               initPQExpBuffer(&connectbuf);
-                               appendPsqlMetaConnect(&connectbuf, 
active_db->db_name);
-                               fputs(connectbuf.data, script);
-                               termPQExpBuffer(&connectbuf);
-                               db_used = true;
-                       }
-                       fprintf(script, "ALTER EXTENSION %s UPDATE;\n",
-                                       quote_identifier(PQgetvalue(res, rowno, 
i_name)));
-               }
+       async_task_add_step(task, report_extension_updates_query,
+                                               
report_extension_updates_result, true, &script);
 
-               PQclear(res);
-
-               PQfinish(conn);
-       }
+       async_task_run(task, cluster);
+       async_task_free(task);
 
        if (script)
        {
-- 
2.39.3 (Apple Git-146)

>From c546c0106c0ff79e8bb74bdd4a902ab53f33f4f5 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Sat, 6 Jul 2024 21:06:31 -0500
Subject: [PATCH v5 08/13] parallelize data type checks in pg_upgrade

---
 src/bin/pg_upgrade/check.c | 319 +++++++++++++++++++------------------
 1 file changed, 160 insertions(+), 159 deletions(-)

diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index 41a23e7efe..9877c7012f 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -32,6 +32,10 @@ static void 
check_new_cluster_subscription_configuration(void);
 static void check_old_cluster_for_valid_slots(void);
 static void check_old_cluster_subscription_state(void);
 
+static bool *data_type_check_results;
+static bool data_type_check_failed;
+static PQExpBufferData data_type_check_report;
+
 /*
  * DataTypesUsageChecks - definitions of data type checks for the old cluster
  * in order to determine if an upgrade can be performed.  See the comment on
@@ -314,6 +318,129 @@ static DataTypesUsageChecks data_types_usage_checks[] =
        }
 };
 
+static char *
+data_type_check_query(void *arg)
+{
+       int                     i = (int) (intptr_t) arg;
+       DataTypesUsageChecks *check = &data_types_usage_checks[i];
+
+       return psprintf("WITH RECURSIVE oids AS ( "
+       /* start with the type(s) returned by base_query */
+                                       "       %s "
+                                       "       UNION ALL "
+                                       "       SELECT * FROM ( "
+       /* inner WITH because we can only reference the CTE once */
+                                       "               WITH x AS (SELECT oid 
FROM oids) "
+       /* domains on any type selected so far */
+                                       "                       SELECT t.oid 
FROM pg_catalog.pg_type t, x WHERE typbasetype = x.oid AND typtype = 'd' "
+                                       "                       UNION ALL "
+       /* arrays over any type selected so far */
+                                       "                       SELECT t.oid 
FROM pg_catalog.pg_type t, x WHERE typelem = x.oid AND typtype = 'b' "
+                                       "                       UNION ALL "
+       /* composite types containing any type selected so far */
+                                       "                       SELECT t.oid 
FROM pg_catalog.pg_type t, pg_catalog.pg_class c, pg_catalog.pg_attribute a, x "
+                                       "                       WHERE t.typtype 
= 'c' AND "
+                                       "                                 t.oid 
= c.reltype AND "
+                                       "                                 c.oid 
= a.attrelid AND "
+                                       "                                 NOT 
a.attisdropped AND "
+                                       "                                 
a.atttypid = x.oid "
+                                       "                       UNION ALL "
+       /* ranges containing any type selected so far */
+                                       "                       SELECT t.oid 
FROM pg_catalog.pg_type t, pg_catalog.pg_range r, x "
+                                       "                       WHERE t.typtype 
= 'r' AND r.rngtypid = t.oid AND r.rngsubtype = x.oid"
+                                       "       ) foo "
+                                       ") "
+       /* now look for stored columns of any such type */
+                                       "SELECT n.nspname, c.relname, a.attname 
"
+                                       "FROM   pg_catalog.pg_class c, "
+                                       "               pg_catalog.pg_namespace 
n, "
+                                       "               pg_catalog.pg_attribute 
a "
+                                       "WHERE  c.oid = a.attrelid AND "
+                                       "               NOT a.attisdropped AND "
+                                       "               a.atttypid IN (SELECT 
oid FROM oids) AND "
+                                       "               c.relkind IN ("
+                                       CppAsString2(RELKIND_RELATION) ", "
+                                       CppAsString2(RELKIND_MATVIEW) ", "
+                                       CppAsString2(RELKIND_INDEX) ") AND "
+                                       "               c.relnamespace = n.oid 
AND "
+       /* exclude possible orphaned temp tables */
+                                       "               n.nspname !~ 
'^pg_temp_' AND "
+                                       "               n.nspname !~ 
'^pg_toast_temp_' AND "
+       /* exclude system catalogs, too */
+                                       "               n.nspname NOT IN 
('pg_catalog', 'information_schema')",
+                                       check->base_query);
+}
+
+static void
+data_type_check_process(DbInfo *dbinfo, PGresult *res, void *arg)
+{
+       int                     i = (int) (intptr_t) arg;
+       DataTypesUsageChecks *check = &data_types_usage_checks[i];
+       int                     ntups = PQntuples(res);
+
+       if (ntups)
+       {
+               char            output_path[MAXPGPATH];
+               int                     i_nspname;
+               int                     i_relname;
+               int                     i_attname;
+               FILE       *script = NULL;
+               bool            db_used = false;
+
+               snprintf(output_path, sizeof(output_path), "%s/%s",
+                                log_opts.basedir,
+                                check->report_filename);
+
+               /*
+                * Make sure we have a buffer to save reports to now that we 
found a
+                * first failing check.
+                */
+               if (!data_type_check_failed)
+                       initPQExpBuffer(&data_type_check_report);
+               data_type_check_failed = true;
+
+               /*
+                * If this is the first time we see an error for the check in 
question
+                * then print a status message of the failure.
+                */
+               if (!data_type_check_results[i])
+               {
+                       pg_log(PG_REPORT, "    failed check: %s", 
_(check->status));
+                       appendPQExpBuffer(&data_type_check_report, "\n%s\n%s    
%s\n",
+                                                         _(check->report_text),
+                                                         _("A list of the 
problem columns is in the file:"),
+                                                         output_path);
+               }
+               data_type_check_results[i] = true;
+
+               i_nspname = PQfnumber(res, "nspname");
+               i_relname = PQfnumber(res, "relname");
+               i_attname = PQfnumber(res, "attname");
+
+               for (int rowno = 0; rowno < ntups; rowno++)
+               {
+                       if (script == NULL && (script = fopen_priv(output_path, 
"a")) == NULL)
+                               pg_fatal("could not open file \"%s\": %m", 
output_path);
+
+                       if (!db_used)
+                       {
+                               fprintf(script, "In database: %s\n", 
dbinfo->db_name);
+                               db_used = true;
+                       }
+                       fprintf(script, "  %s.%s.%s\n",
+                                       PQgetvalue(res, rowno, i_nspname),
+                                       PQgetvalue(res, rowno, i_relname),
+                                       PQgetvalue(res, rowno, i_attname));
+               }
+
+               if (script)
+               {
+                       fclose(script);
+                       script = NULL;
+               }
+       }
+}
+
 /*
  * check_for_data_types_usage()
  *     Detect whether there are any stored columns depending on given type(s)
@@ -337,10 +464,9 @@ static void
 check_for_data_types_usage(ClusterInfo *cluster, DataTypesUsageChecks *checks)
 {
        bool            found = false;
-       bool       *results;
-       PQExpBufferData report;
        DataTypesUsageChecks *tmp = checks;
        int                     n_data_types_usage_checks = 0;
+       AsyncTask  *task = async_task_create();
 
        prep_status("Checking for data type usage");
 
@@ -352,176 +478,51 @@ check_for_data_types_usage(ClusterInfo *cluster, 
DataTypesUsageChecks *checks)
        }
 
        /* Prepare an array to store the results of checks in */
-       results = pg_malloc0(sizeof(bool) * n_data_types_usage_checks);
+       data_type_check_results = pg_malloc0(sizeof(bool) * 
n_data_types_usage_checks);
+       data_type_check_failed = false;
 
-       /*
-        * Connect to each database in the cluster and run all defined checks
-        * against that database before trying the next one.
-        */
-       for (int dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
+       for (int i = 0; i < n_data_types_usage_checks; i++)
        {
-               DbInfo     *active_db = &cluster->dbarr.dbs[dbnum];
-               PGconn     *conn = connectToServer(cluster, active_db->db_name);
+               DataTypesUsageChecks *check = &checks[i];
 
-               for (int checknum = 0; checknum < n_data_types_usage_checks; 
checknum++)
+               if (check->threshold_version == MANUAL_CHECK)
                {
-                       PGresult   *res;
-                       int                     ntups;
-                       int                     i_nspname;
-                       int                     i_relname;
-                       int                     i_attname;
-                       FILE       *script = NULL;
-                       bool            db_used = false;
-                       char            output_path[MAXPGPATH];
-                       DataTypesUsageChecks *cur_check = &checks[checknum];
-
-                       if (cur_check->threshold_version == MANUAL_CHECK)
-                       {
-                               Assert(cur_check->version_hook);
-
-                               /*
-                                * Make sure that the check applies to the 
current cluster
-                                * version and skip if not. If no check hook 
has been defined
-                                * we run the check for all versions.
-                                */
-                               if (!cur_check->version_hook(cluster))
-                                       continue;
-                       }
-                       else if (cur_check->threshold_version != ALL_VERSIONS)
-                       {
-                               if (GET_MAJOR_VERSION(cluster->major_version) > 
cur_check->threshold_version)
-                                       continue;
-                       }
-                       else
-                               Assert(cur_check->threshold_version == 
ALL_VERSIONS);
-
-                       snprintf(output_path, sizeof(output_path), "%s/%s",
-                                        log_opts.basedir,
-                                        cur_check->report_filename);
-
-                       /*
-                        * The type(s) of interest might be wrapped in a 
domain, array,
-                        * composite, or range, and these container types can 
be nested
-                        * (to varying extents depending on server version, but 
that's not
-                        * of concern here).  To handle all these cases we need 
a
-                        * recursive CTE.
-                        */
-                       res = executeQueryOrDie(conn,
-                                                                       "WITH 
RECURSIVE oids AS ( "
-                       /* start with the type(s) returned by base_query */
-                                                                       "       
%s "
-                                                                       "       
UNION ALL "
-                                                                       "       
SELECT * FROM ( "
-                       /* inner WITH because we can only reference the CTE 
once */
-                                                                       "       
        WITH x AS (SELECT oid FROM oids) "
-                       /* domains on any type selected so far */
-                                                                       "       
                SELECT t.oid FROM pg_catalog.pg_type t, x WHERE typbasetype = 
x.oid AND typtype = 'd' "
-                                                                       "       
                UNION ALL "
-                       /* arrays over any type selected so far */
-                                                                       "       
                SELECT t.oid FROM pg_catalog.pg_type t, x WHERE typelem = x.oid 
AND typtype = 'b' "
-                                                                       "       
                UNION ALL "
-                       /* composite types containing any type selected so far 
*/
-                                                                       "       
                SELECT t.oid FROM pg_catalog.pg_type t, pg_catalog.pg_class c, 
pg_catalog.pg_attribute a, x "
-                                                                       "       
                WHERE t.typtype = 'c' AND "
-                                                                       "       
                          t.oid = c.reltype AND "
-                                                                       "       
                          c.oid = a.attrelid AND "
-                                                                       "       
                          NOT a.attisdropped AND "
-                                                                       "       
                          a.atttypid = x.oid "
-                                                                       "       
                UNION ALL "
-                       /* ranges containing any type selected so far */
-                                                                       "       
                SELECT t.oid FROM pg_catalog.pg_type t, pg_catalog.pg_range r, 
x "
-                                                                       "       
                WHERE t.typtype = 'r' AND r.rngtypid = t.oid AND r.rngsubtype = 
x.oid"
-                                                                       "       
) foo "
-                                                                       ") "
-                       /* now look for stored columns of any such type */
-                                                                       "SELECT 
n.nspname, c.relname, a.attname "
-                                                                       "FROM   
pg_catalog.pg_class c, "
-                                                                       "       
        pg_catalog.pg_namespace n, "
-                                                                       "       
        pg_catalog.pg_attribute a "
-                                                                       "WHERE  
c.oid = a.attrelid AND "
-                                                                       "       
        NOT a.attisdropped AND "
-                                                                       "       
        a.atttypid IN (SELECT oid FROM oids) AND "
-                                                                       "       
        c.relkind IN ("
-                                                                       
CppAsString2(RELKIND_RELATION) ", "
-                                                                       
CppAsString2(RELKIND_MATVIEW) ", "
-                                                                       
CppAsString2(RELKIND_INDEX) ") AND "
-                                                                       "       
        c.relnamespace = n.oid AND "
-                       /* exclude possible orphaned temp tables */
-                                                                       "       
        n.nspname !~ '^pg_temp_' AND "
-                                                                       "       
        n.nspname !~ '^pg_toast_temp_' AND "
-                       /* exclude system catalogs, too */
-                                                                       "       
        n.nspname NOT IN ('pg_catalog', 'information_schema')",
-                                                                       
cur_check->base_query);
-
-                       ntups = PQntuples(res);
+                       Assert(check->version_hook);
 
                        /*
-                        * The datatype was found, so extract the data and log 
to the
-                        * requested filename. We need to open the file for 
appending
-                        * since the check might have already found the type in 
another
-                        * database earlier in the loop.
+                        * Make sure that the check applies to the current 
cluster version
+                        * and skip it if not.
                         */
-                       if (ntups)
-                       {
-                               /*
-                                * Make sure we have a buffer to save reports 
to now that we
-                                * found a first failing check.
-                                */
-                               if (!found)
-                                       initPQExpBuffer(&report);
-                               found = true;
-
-                               /*
-                                * If this is the first time we see an error 
for the check in
-                                * question then print a status message of the 
failure.
-                                */
-                               if (!results[checknum])
-                               {
-                                       pg_log(PG_REPORT, "    failed check: 
%s", _(cur_check->status));
-                                       appendPQExpBuffer(&report, "\n%s\n%s    
%s\n",
-                                                                         
_(cur_check->report_text),
-                                                                         _("A 
list of the problem columns is in the file:"),
-                                                                         
output_path);
-                               }
-                               results[checknum] = true;
-
-                               i_nspname = PQfnumber(res, "nspname");
-                               i_relname = PQfnumber(res, "relname");
-                               i_attname = PQfnumber(res, "attname");
-
-                               for (int rowno = 0; rowno < ntups; rowno++)
-                               {
-                                       if (script == NULL && (script = 
fopen_priv(output_path, "a")) == NULL)
-                                               pg_fatal("could not open file 
\"%s\": %m", output_path);
-
-                                       if (!db_used)
-                                       {
-                                               fprintf(script, "In database: 
%s\n", active_db->db_name);
-                                               db_used = true;
-                                       }
-                                       fprintf(script, "  %s.%s.%s\n",
-                                                       PQgetvalue(res, rowno, 
i_nspname),
-                                                       PQgetvalue(res, rowno, 
i_relname),
-                                                       PQgetvalue(res, rowno, 
i_attname));
-                               }
-
-                               if (script)
-                               {
-                                       fclose(script);
-                                       script = NULL;
-                               }
-                       }
-
-                       PQclear(res);
+                       if (!check->version_hook(cluster))
+                               continue;
                }
+               else if (check->threshold_version != ALL_VERSIONS)
+               {
+                       if (GET_MAJOR_VERSION(cluster->major_version) > 
check->threshold_version)
+                               continue;
+               }
+               else
+                       Assert(check->threshold_version == ALL_VERSIONS);
 
-               PQfinish(conn);
+               async_task_add_step(task, data_type_check_query,
+                                                       
data_type_check_process, true,
+                                                       (void *) (intptr_t) i);
        }
 
+       /*
+        * Connect to each database in the cluster and run all defined checks
+        * against that database before trying the next one.
+        */
+       async_task_run(task, cluster);
+       async_task_free(task);
+
        if (found)
-               pg_fatal("Data type checks failed: %s", report.data);
+       {
+               pg_fatal("Data type checks failed: %s", 
data_type_check_report.data);
+               termPQExpBuffer(&data_type_check_report);
+       }
 
-       pg_free(results);
+       pg_free(data_type_check_results);
 
        check_ok();
 }
-- 
2.39.3 (Apple Git-146)

>From b882aa11fb05c4985934e45496e7c374c8bf2461 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Mon, 8 Jul 2024 21:00:20 -0500
Subject: [PATCH v5 09/13] parallelize isn and int8 passing mismatch check in
 pg_upgrade

---
 src/bin/pg_upgrade/check.c | 85 ++++++++++++++++++++------------------
 1 file changed, 44 insertions(+), 41 deletions(-)

diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index 9877c7012f..505ebd1caa 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -1193,6 +1193,44 @@ check_for_prepared_transactions(ClusterInfo *cluster)
        check_ok();
 }
 
+static char *
+isn_and_int8_passing_mismatch_query(void *arg)
+{
+       return pg_strdup("SELECT n.nspname, p.proname "
+                                        "FROM   pg_catalog.pg_proc p, "
+                                        "       pg_catalog.pg_namespace n "
+                                        "WHERE  p.pronamespace = n.oid AND "
+                                        "       p.probin = '$libdir/isn'");
+}
+
+static void
+isn_and_int8_passing_mismatch_process(DbInfo *dbinfo, PGresult *res, void *arg)
+{
+       bool            db_used = false;
+       int                     ntups = PQntuples(res);
+       int                     i_nspname = PQfnumber(res, "nspname");
+       int                     i_proname = PQfnumber(res, "proname");
+       FILE      **script = (FILE **) arg;
+       char            output_path[MAXPGPATH];
+
+       snprintf(output_path, sizeof(output_path), "%s/%s",
+                        log_opts.basedir,
+                        "contrib_isn_and_int8_pass_by_value.txt");
+
+       for (int rowno = 0; rowno < ntups; rowno++)
+       {
+               if (*script == NULL && (*script = fopen_priv(output_path, "w")) 
== NULL)
+                       pg_fatal("could not open file \"%s\": %m", output_path);
+               if (!db_used)
+               {
+                       fprintf(*script, "In database: %s\n", dbinfo->db_name);
+                       db_used = true;
+               }
+               fprintf(*script, "  %s.%s\n",
+                               PQgetvalue(res, rowno, i_nspname),
+                               PQgetvalue(res, rowno, i_proname));
+       }
+}
 
 /*
  *     check_for_isn_and_int8_passing_mismatch()
@@ -1204,8 +1242,8 @@ check_for_prepared_transactions(ClusterInfo *cluster)
 static void
 check_for_isn_and_int8_passing_mismatch(ClusterInfo *cluster)
 {
-       int                     dbnum;
        FILE       *script = NULL;
+       AsyncTask  *task;
        char            output_path[MAXPGPATH];
 
        prep_status("Checking for contrib/isn with bigint-passing mismatch");
@@ -1222,46 +1260,11 @@ check_for_isn_and_int8_passing_mismatch(ClusterInfo 
*cluster)
                         log_opts.basedir,
                         "contrib_isn_and_int8_pass_by_value.txt");
 
-       for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
-       {
-               PGresult   *res;
-               bool            db_used = false;
-               int                     ntups;
-               int                     rowno;
-               int                     i_nspname,
-                                       i_proname;
-               DbInfo     *active_db = &cluster->dbarr.dbs[dbnum];
-               PGconn     *conn = connectToServer(cluster, active_db->db_name);
-
-               /* Find any functions coming from contrib/isn */
-               res = executeQueryOrDie(conn,
-                                                               "SELECT 
n.nspname, p.proname "
-                                                               "FROM   
pg_catalog.pg_proc p, "
-                                                               "               
pg_catalog.pg_namespace n "
-                                                               "WHERE  
p.pronamespace = n.oid AND "
-                                                               "               
p.probin = '$libdir/isn'");
-
-               ntups = PQntuples(res);
-               i_nspname = PQfnumber(res, "nspname");
-               i_proname = PQfnumber(res, "proname");
-               for (rowno = 0; rowno < ntups; rowno++)
-               {
-                       if (script == NULL && (script = fopen_priv(output_path, 
"w")) == NULL)
-                               pg_fatal("could not open file \"%s\": %m", 
output_path);
-                       if (!db_used)
-                       {
-                               fprintf(script, "In database: %s\n", 
active_db->db_name);
-                               db_used = true;
-                       }
-                       fprintf(script, "  %s.%s\n",
-                                       PQgetvalue(res, rowno, i_nspname),
-                                       PQgetvalue(res, rowno, i_proname));
-               }
-
-               PQclear(res);
-
-               PQfinish(conn);
-       }
+       task = async_task_create();
+       async_task_add_step(task, isn_and_int8_passing_mismatch_query,
+                                               
isn_and_int8_passing_mismatch_process, true, &script);
+       async_task_run(task, cluster);
+       async_task_free(task);
 
        if (script)
        {
-- 
2.39.3 (Apple Git-146)

>From 79c3f7b4303b6e7a857b02f2194c06cb92d42301 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Mon, 8 Jul 2024 21:12:49 -0500
Subject: [PATCH v5 10/13] parallelize user defined postfix ops check in
 pg_upgrade

---
 src/bin/pg_upgrade/check.c | 134 +++++++++++++++++++------------------
 1 file changed, 69 insertions(+), 65 deletions(-)

diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index 505ebd1caa..664039b441 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -1282,15 +1282,79 @@ check_for_isn_and_int8_passing_mismatch(ClusterInfo 
*cluster)
                check_ok();
 }
 
+static char *
+user_defined_postfix_ops_query(void *arg)
+{
+       /*
+        * The query below hardcodes FirstNormalObjectId as 16384 rather than
+        * interpolating that C #define into the query because, if that #define 
is
+        * ever changed, the cutoff we want to use is the value used by
+        * pre-version 14 servers, not that of some future version.
+        */
+       return pg_strdup("SELECT o.oid AS oproid, "
+                                        "       n.nspname AS oprnsp, "
+                                        "       o.oprname, "
+                                        "       tn.nspname AS typnsp, "
+                                        "       t.typname "
+                                        "FROM pg_catalog.pg_operator o, "
+                                        "     pg_catalog.pg_namespace n, "
+                                        "     pg_catalog.pg_type t, "
+                                        "     pg_catalog.pg_namespace tn "
+                                        "WHERE o.oprnamespace = n.oid AND "
+                                        "      o.oprleft = t.oid AND "
+                                        "      t.typnamespace = tn.oid AND "
+                                        "      o.oprright = 0 AND "
+                                        "      o.oid >= 16384");
+}
+
+static void
+user_defined_postfix_ops_process(DbInfo *dbinfo, PGresult *res, void *arg)
+{
+       FILE      **script = (FILE **) arg;
+       char            output_path[MAXPGPATH];
+       int                     ntups = PQntuples(res);
+       bool            db_used = false;
+       int                     i_oproid = PQfnumber(res, "oproid");
+       int                     i_oprnsp = PQfnumber(res, "oprnsp");
+       int                     i_oprname = PQfnumber(res, "oprname");
+       int                     i_typnsp = PQfnumber(res, "typnsp");
+       int                     i_typname = PQfnumber(res, "typname");
+
+       if (!ntups)
+               return;
+
+       snprintf(output_path, sizeof(output_path), "%s/%s",
+                        log_opts.basedir,
+                        "postfix_ops.txt");
+
+       for (int rowno = 0; rowno < ntups; rowno++)
+       {
+               if (*script == NULL &&
+                       (*script = fopen_priv(output_path, "w")) == NULL)
+                       pg_fatal("could not open file \"%s\": %m", output_path);
+               if (!db_used)
+               {
+                       fprintf(*script, "In database: %s\n", dbinfo->db_name);
+                       db_used = true;
+               }
+               fprintf(*script, "  (oid=%s) %s.%s (%s.%s, NONE)\n",
+                               PQgetvalue(res, rowno, i_oproid),
+                               PQgetvalue(res, rowno, i_oprnsp),
+                               PQgetvalue(res, rowno, i_oprname),
+                               PQgetvalue(res, rowno, i_typnsp),
+                               PQgetvalue(res, rowno, i_typname));
+       }
+}
+
 /*
  * Verify that no user defined postfix operators exist.
  */
 static void
 check_for_user_defined_postfix_ops(ClusterInfo *cluster)
 {
-       int                     dbnum;
        FILE       *script = NULL;
        char            output_path[MAXPGPATH];
+       AsyncTask  *task = async_task_create();
 
        prep_status("Checking for user-defined postfix operators");
 
@@ -1298,70 +1362,10 @@ check_for_user_defined_postfix_ops(ClusterInfo *cluster)
                         log_opts.basedir,
                         "postfix_ops.txt");
 
-       /* Find any user defined postfix operators */
-       for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
-       {
-               PGresult   *res;
-               bool            db_used = false;
-               int                     ntups;
-               int                     rowno;
-               int                     i_oproid,
-                                       i_oprnsp,
-                                       i_oprname,
-                                       i_typnsp,
-                                       i_typname;
-               DbInfo     *active_db = &cluster->dbarr.dbs[dbnum];
-               PGconn     *conn = connectToServer(cluster, active_db->db_name);
-
-               /*
-                * The query below hardcodes FirstNormalObjectId as 16384 
rather than
-                * interpolating that C #define into the query because, if that
-                * #define is ever changed, the cutoff we want to use is the 
value
-                * used by pre-version 14 servers, not that of some future 
version.
-                */
-               res = executeQueryOrDie(conn,
-                                                               "SELECT o.oid 
AS oproid, "
-                                                               "       
n.nspname AS oprnsp, "
-                                                               "       
o.oprname, "
-                                                               "       
tn.nspname AS typnsp, "
-                                                               "       
t.typname "
-                                                               "FROM 
pg_catalog.pg_operator o, "
-                                                               "     
pg_catalog.pg_namespace n, "
-                                                               "     
pg_catalog.pg_type t, "
-                                                               "     
pg_catalog.pg_namespace tn "
-                                                               "WHERE 
o.oprnamespace = n.oid AND "
-                                                               "      
o.oprleft = t.oid AND "
-                                                               "      
t.typnamespace = tn.oid AND "
-                                                               "      
o.oprright = 0 AND "
-                                                               "      o.oid >= 
16384");
-               ntups = PQntuples(res);
-               i_oproid = PQfnumber(res, "oproid");
-               i_oprnsp = PQfnumber(res, "oprnsp");
-               i_oprname = PQfnumber(res, "oprname");
-               i_typnsp = PQfnumber(res, "typnsp");
-               i_typname = PQfnumber(res, "typname");
-               for (rowno = 0; rowno < ntups; rowno++)
-               {
-                       if (script == NULL &&
-                               (script = fopen_priv(output_path, "w")) == NULL)
-                               pg_fatal("could not open file \"%s\": %m", 
output_path);
-                       if (!db_used)
-                       {
-                               fprintf(script, "In database: %s\n", 
active_db->db_name);
-                               db_used = true;
-                       }
-                       fprintf(script, "  (oid=%s) %s.%s (%s.%s, NONE)\n",
-                                       PQgetvalue(res, rowno, i_oproid),
-                                       PQgetvalue(res, rowno, i_oprnsp),
-                                       PQgetvalue(res, rowno, i_oprname),
-                                       PQgetvalue(res, rowno, i_typnsp),
-                                       PQgetvalue(res, rowno, i_typname));
-               }
-
-               PQclear(res);
-
-               PQfinish(conn);
-       }
+       async_task_add_step(task, user_defined_postfix_ops_query,
+                                               
user_defined_postfix_ops_process, true, &script);
+       async_task_run(task, cluster);
+       async_task_free(task);
 
        if (script)
        {
-- 
2.39.3 (Apple Git-146)

>From 3ee389a9ffebf05fb691beb16b0e0a5f94d02f9b Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Mon, 8 Jul 2024 21:30:19 -0500
Subject: [PATCH v5 11/13] parallelize incompatible polymorphics check in
 pg_upgrade

---
 src/bin/pg_upgrade/check.c       | 174 ++++++++++++++++---------------
 src/tools/pgindent/typedefs.list |   1 +
 2 files changed, 92 insertions(+), 83 deletions(-)

diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index 664039b441..614464dfd3 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -1381,6 +1381,81 @@ check_for_user_defined_postfix_ops(ClusterInfo *cluster)
                check_ok();
 }
 
+typedef struct incompat_polymorphics_state
+{
+       FILE       *script;
+       PQExpBufferData old_polymorphics;
+       char            output_path[MAXPGPATH];
+} incompat_polymorphics_state;
+
+static char *
+incompat_polymorphics_query(void *arg)
+{
+       incompat_polymorphics_state *state = (incompat_polymorphics_state *) 
arg;
+
+       /*
+        * The query below hardcodes FirstNormalObjectId as 16384 rather than
+        * interpolating that C #define into the query because, if that #define 
is
+        * ever changed, the cutoff we want to use is the value used by
+        * pre-version 14 servers, not that of some future version.
+        */
+       /* Aggregate transition functions */
+       return psprintf("SELECT 'aggregate' AS objkind, 
p.oid::regprocedure::text AS objname "
+                                       "FROM pg_proc AS p "
+                                       "JOIN pg_aggregate AS a ON 
a.aggfnoid=p.oid "
+                                       "JOIN pg_proc AS transfn ON 
transfn.oid=a.aggtransfn "
+                                       "WHERE p.oid >= 16384 "
+                                       "AND a.aggtransfn = 
ANY(ARRAY[%s]::regprocedure[]) "
+                                       "AND a.aggtranstype = 
ANY(ARRAY['anyarray', 'anyelement']::regtype[]) "
+
+       /* Aggregate final functions */
+                                       "UNION ALL "
+                                       "SELECT 'aggregate' AS objkind, 
p.oid::regprocedure::text AS objname "
+                                       "FROM pg_proc AS p "
+                                       "JOIN pg_aggregate AS a ON 
a.aggfnoid=p.oid "
+                                       "JOIN pg_proc AS finalfn ON 
finalfn.oid=a.aggfinalfn "
+                                       "WHERE p.oid >= 16384 "
+                                       "AND a.aggfinalfn = 
ANY(ARRAY[%s]::regprocedure[]) "
+                                       "AND a.aggtranstype = 
ANY(ARRAY['anyarray', 'anyelement']::regtype[]) "
+
+       /* Operators */
+                                       "UNION ALL "
+                                       "SELECT 'operator' AS objkind, 
op.oid::regoperator::text AS objname "
+                                       "FROM pg_operator AS op "
+                                       "WHERE op.oid >= 16384 "
+                                       "AND oprcode = 
ANY(ARRAY[%s]::regprocedure[]) "
+                                       "AND oprleft = ANY(ARRAY['anyarray', 
'anyelement']::regtype[]);",
+                                       state->old_polymorphics.data,
+                                       state->old_polymorphics.data,
+                                       state->old_polymorphics.data);
+}
+
+static void
+incompat_polymorphics_process(DbInfo *dbinfo, PGresult *res, void *arg)
+{
+       incompat_polymorphics_state *state = (incompat_polymorphics_state *) 
arg;
+       bool            db_used = false;
+       int                     ntups = PQntuples(res);
+       int                     i_objkind = PQfnumber(res, "objkind");
+       int                     i_objname = PQfnumber(res, "objname");
+
+       for (int rowno = 0; rowno < ntups; rowno++)
+       {
+               if (state->script == NULL &&
+                       (state->script = fopen_priv(state->output_path, "w")) 
== NULL)
+                       pg_fatal("could not open file \"%s\": %m", 
state->output_path);
+               if (!db_used)
+               {
+                       fprintf(state->script, "In database: %s\n", 
dbinfo->db_name);
+                       db_used = true;
+               }
+
+               fprintf(state->script, "  %s: %s\n",
+                               PQgetvalue(res, rowno, i_objkind),
+                               PQgetvalue(res, rowno, i_objname));
+       }
+}
+
 /*
  *     check_for_incompatible_polymorphics()
  *
@@ -1390,111 +1465,44 @@ check_for_user_defined_postfix_ops(ClusterInfo 
*cluster)
 static void
 check_for_incompatible_polymorphics(ClusterInfo *cluster)
 {
-       PGresult   *res;
-       FILE       *script = NULL;
-       char            output_path[MAXPGPATH];
-       PQExpBufferData old_polymorphics;
+       AsyncTask  *task = async_task_create();
+       incompat_polymorphics_state state;
 
        prep_status("Checking for incompatible polymorphic functions");
 
-       snprintf(output_path, sizeof(output_path), "%s/%s",
+       state.script = NULL;
+       snprintf(state.output_path, sizeof(state.output_path), "%s/%s",
                         log_opts.basedir,
                         "incompatible_polymorphics.txt");
 
        /* The set of problematic functions varies a bit in different versions 
*/
-       initPQExpBuffer(&old_polymorphics);
+       initPQExpBuffer(&state.old_polymorphics);
 
-       appendPQExpBufferStr(&old_polymorphics,
+       appendPQExpBufferStr(&state.old_polymorphics,
                                                 
"'array_append(anyarray,anyelement)'"
                                                 ", 
'array_cat(anyarray,anyarray)'"
                                                 ", 
'array_prepend(anyelement,anyarray)'");
 
        if (GET_MAJOR_VERSION(cluster->major_version) >= 903)
-               appendPQExpBufferStr(&old_polymorphics,
+               appendPQExpBufferStr(&state.old_polymorphics,
                                                         ", 
'array_remove(anyarray,anyelement)'"
                                                         ", 
'array_replace(anyarray,anyelement,anyelement)'");
 
        if (GET_MAJOR_VERSION(cluster->major_version) >= 905)
-               appendPQExpBufferStr(&old_polymorphics,
+               appendPQExpBufferStr(&state.old_polymorphics,
                                                         ", 
'array_position(anyarray,anyelement)'"
                                                         ", 
'array_position(anyarray,anyelement,integer)'"
                                                         ", 
'array_positions(anyarray,anyelement)'"
                                                         ", 
'width_bucket(anyelement,anyarray)'");
 
-       for (int dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
-       {
-               bool            db_used = false;
-               DbInfo     *active_db = &cluster->dbarr.dbs[dbnum];
-               PGconn     *conn = connectToServer(cluster, active_db->db_name);
-               int                     ntups;
-               int                     i_objkind,
-                                       i_objname;
-
-               /*
-                * The query below hardcodes FirstNormalObjectId as 16384 
rather than
-                * interpolating that C #define into the query because, if that
-                * #define is ever changed, the cutoff we want to use is the 
value
-                * used by pre-version 14 servers, not that of some future 
version.
-                */
-               res = executeQueryOrDie(conn,
-               /* Aggregate transition functions */
-                                                               "SELECT 
'aggregate' AS objkind, p.oid::regprocedure::text AS objname "
-                                                               "FROM pg_proc 
AS p "
-                                                               "JOIN 
pg_aggregate AS a ON a.aggfnoid=p.oid "
-                                                               "JOIN pg_proc 
AS transfn ON transfn.oid=a.aggtransfn "
-                                                               "WHERE p.oid >= 
16384 "
-                                                               "AND 
a.aggtransfn = ANY(ARRAY[%s]::regprocedure[]) "
-                                                               "AND 
a.aggtranstype = ANY(ARRAY['anyarray', 'anyelement']::regtype[]) "
-
-               /* Aggregate final functions */
-                                                               "UNION ALL "
-                                                               "SELECT 
'aggregate' AS objkind, p.oid::regprocedure::text AS objname "
-                                                               "FROM pg_proc 
AS p "
-                                                               "JOIN 
pg_aggregate AS a ON a.aggfnoid=p.oid "
-                                                               "JOIN pg_proc 
AS finalfn ON finalfn.oid=a.aggfinalfn "
-                                                               "WHERE p.oid >= 
16384 "
-                                                               "AND 
a.aggfinalfn = ANY(ARRAY[%s]::regprocedure[]) "
-                                                               "AND 
a.aggtranstype = ANY(ARRAY['anyarray', 'anyelement']::regtype[]) "
-
-               /* Operators */
-                                                               "UNION ALL "
-                                                               "SELECT 
'operator' AS objkind, op.oid::regoperator::text AS objname "
-                                                               "FROM 
pg_operator AS op "
-                                                               "WHERE op.oid 
>= 16384 "
-                                                               "AND oprcode = 
ANY(ARRAY[%s]::regprocedure[]) "
-                                                               "AND oprleft = 
ANY(ARRAY['anyarray', 'anyelement']::regtype[]);",
-                                                               
old_polymorphics.data,
-                                                               
old_polymorphics.data,
-                                                               
old_polymorphics.data);
-
-               ntups = PQntuples(res);
-
-               i_objkind = PQfnumber(res, "objkind");
-               i_objname = PQfnumber(res, "objname");
-
-               for (int rowno = 0; rowno < ntups; rowno++)
-               {
-                       if (script == NULL &&
-                               (script = fopen_priv(output_path, "w")) == NULL)
-                               pg_fatal("could not open file \"%s\": %m", 
output_path);
-                       if (!db_used)
-                       {
-                               fprintf(script, "In database: %s\n", 
active_db->db_name);
-                               db_used = true;
-                       }
-
-                       fprintf(script, "  %s: %s\n",
-                                       PQgetvalue(res, rowno, i_objkind),
-                                       PQgetvalue(res, rowno, i_objname));
-               }
-
-               PQclear(res);
-               PQfinish(conn);
-       }
+       async_task_add_step(task, incompat_polymorphics_query,
+                                               incompat_polymorphics_process, 
true, &state);
+       async_task_run(task, cluster);
+       async_task_free(task);
 
-       if (script)
+       if (state.script)
        {
-               fclose(script);
+               fclose(state.script);
                pg_log(PG_REPORT, "fatal");
                pg_fatal("Your installation contains user-defined objects that 
refer to internal\n"
                                 "polymorphic functions with arguments of type 
\"anyarray\" or \"anyelement\".\n"
@@ -1502,12 +1510,12 @@ check_for_incompatible_polymorphics(ClusterInfo 
*cluster)
                                 "afterwards, changing them to refer to the new 
corresponding functions with\n"
                                 "arguments of type \"anycompatiblearray\" and 
\"anycompatible\".\n"
                                 "A list of the problematic objects is in the 
file:\n"
-                                "    %s", output_path);
+                                "    %s", state.output_path);
        }
        else
                check_ok();
 
-       termPQExpBuffer(&old_polymorphics);
+       termPQExpBuffer(&state.old_polymorphics);
 }
 
 /*
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index a2330f4346..66db3d3e2c 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -3536,6 +3536,7 @@ hstoreUpgrade_t
 hyperLogLogState
 ifState
 import_error_callback_arg
+incompat_polymorphics_state
 indexed_tlist
 inet
 inetKEY
-- 
2.39.3 (Apple Git-146)

>From 20d8f0905226a2e2d05deff83054f7ba7561d5f1 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Mon, 8 Jul 2024 21:42:22 -0500
Subject: [PATCH v5 12/13] parallelize tables with oids check in pg_upgrade

---
 src/bin/pg_upgrade/check.c | 90 ++++++++++++++++++++------------------
 1 file changed, 48 insertions(+), 42 deletions(-)

diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index 614464dfd3..a09eafa16e 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -1518,15 +1518,58 @@ check_for_incompatible_polymorphics(ClusterInfo 
*cluster)
        termPQExpBuffer(&state.old_polymorphics);
 }
 
+static char *
+with_oids_query(void *arg)
+{
+       return pg_strdup("SELECT n.nspname, c.relname "
+                                        "FROM   pg_catalog.pg_class c, "
+                                        "       pg_catalog.pg_namespace n "
+                                        "WHERE  c.relnamespace = n.oid AND "
+                                        "       c.relhasoids AND"
+                                        "       n.nspname NOT IN 
('pg_catalog')");
+}
+
+static void
+with_oids_process(DbInfo *dbinfo, PGresult *res, void *arg)
+{
+       bool            db_used = false;
+       int                     ntups = PQntuples(res);
+       char            output_path[MAXPGPATH];
+       int                     i_nspname = PQfnumber(res, "nspname");
+       int                     i_relname = PQfnumber(res, "relname");
+       FILE      **script = (FILE **) arg;
+
+       if (!ntups)
+               return;
+
+       snprintf(output_path, sizeof(output_path), "%s/%s",
+                        log_opts.basedir,
+                        "tables_with_oids.txt");
+
+       for (int rowno = 0; rowno < ntups; rowno++)
+       {
+               if (*script == NULL && (*script = fopen_priv(output_path, "w")) 
== NULL)
+                       pg_fatal("could not open file \"%s\": %m", output_path);
+               if (!db_used)
+               {
+                       fprintf(*script, "In database: %s\n", dbinfo->db_name);
+                       db_used = true;
+               }
+               fprintf(*script, "  %s.%s\n",
+                               PQgetvalue(res, rowno, i_nspname),
+                               PQgetvalue(res, rowno, i_relname));
+       }
+}
+
 /*
  * Verify that no tables are declared WITH OIDS.
  */
 static void
 check_for_tables_with_oids(ClusterInfo *cluster)
 {
-       int                     dbnum;
        FILE       *script = NULL;
        char            output_path[MAXPGPATH];
+       AsyncTask  *task = async_task_create();
 
        prep_status("Checking for tables WITH OIDS");
 
@@ -1534,47 +1577,10 @@ check_for_tables_with_oids(ClusterInfo *cluster)
                         log_opts.basedir,
                         "tables_with_oids.txt");
 
-       /* Find any tables declared WITH OIDS */
-       for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
-       {
-               PGresult   *res;
-               bool            db_used = false;
-               int                     ntups;
-               int                     rowno;
-               int                     i_nspname,
-                                       i_relname;
-               DbInfo     *active_db = &cluster->dbarr.dbs[dbnum];
-               PGconn     *conn = connectToServer(cluster, active_db->db_name);
-
-               res = executeQueryOrDie(conn,
-                                                               "SELECT 
n.nspname, c.relname "
-                                                               "FROM   
pg_catalog.pg_class c, "
-                                                               "               
pg_catalog.pg_namespace n "
-                                                               "WHERE  
c.relnamespace = n.oid AND "
-                                                               "               
c.relhasoids AND"
-                                                               "       
n.nspname NOT IN ('pg_catalog')");
-
-               ntups = PQntuples(res);
-               i_nspname = PQfnumber(res, "nspname");
-               i_relname = PQfnumber(res, "relname");
-               for (rowno = 0; rowno < ntups; rowno++)
-               {
-                       if (script == NULL && (script = fopen_priv(output_path, 
"w")) == NULL)
-                               pg_fatal("could not open file \"%s\": %m", 
output_path);
-                       if (!db_used)
-                       {
-                               fprintf(script, "In database: %s\n", 
active_db->db_name);
-                               db_used = true;
-                       }
-                       fprintf(script, "  %s.%s\n",
-                                       PQgetvalue(res, rowno, i_nspname),
-                                       PQgetvalue(res, rowno, i_relname));
-               }
-
-               PQclear(res);
-
-               PQfinish(conn);
-       }
+       async_task_add_step(task, with_oids_query,
+                                               with_oids_process, true, 
&script);
+       async_task_run(task, cluster);
+       async_task_free(task);
 
        if (script)
        {
-- 
2.39.3 (Apple Git-146)

>From 86486c2c8e1862ad616967efa7aa025aa60c9283 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Mon, 8 Jul 2024 21:52:13 -0500
Subject: [PATCH v5 13/13] parallelize user defined encoding conversions check
 in pg_upgrade

---
 src/bin/pg_upgrade/check.c | 107 ++++++++++++++++++++-----------------
 1 file changed, 57 insertions(+), 50 deletions(-)

diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index a09eafa16e..5ef8169488 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -1654,15 +1654,66 @@ check_for_pg_role_prefix(ClusterInfo *cluster)
                check_ok();
 }
 
+static char *
+user_defined_encoding_conversions_query(void *arg)
+{
+       /*
+        * The query below hardcodes FirstNormalObjectId as 16384 rather than
+        * interpolating that C #define into the query because, if that #define 
is
+        * ever changed, the cutoff we want to use is the value used by
+        * pre-version 14 servers, not that of some future version.
+        */
+       return pstrdup("SELECT c.oid as conoid, c.conname, n.nspname "
+                                  "FROM pg_catalog.pg_conversion c, "
+                                  "     pg_catalog.pg_namespace n "
+                                  "WHERE c.connamespace = n.oid AND "
+                                  "      c.oid >= 16384");
+}
+
+static void
+user_defined_encoding_conversions_process(DbInfo *dbinfo, PGresult *res, void 
*arg)
+{
+       FILE      **script = (FILE **) arg;
+       bool            db_used = false;
+       int                     ntups = PQntuples(res);
+       char            output_path[MAXPGPATH];
+       int                     i_conoid = PQfnumber(res, "conoid");
+       int                     i_conname = PQfnumber(res, "conname");
+       int                     i_nspname = PQfnumber(res, "nspname");
+
+       if (!ntups)
+               return;
+
+       snprintf(output_path, sizeof(output_path), "%s/%s",
+                        log_opts.basedir,
+                        "encoding_conversions.txt");
+
+       for (int rowno = 0; rowno < ntups; rowno++)
+       {
+               if (*script == NULL &&
+                       (*script = fopen_priv(output_path, "w")) == NULL)
+                       pg_fatal("could not open file \"%s\": %m", output_path);
+               if (!db_used)
+               {
+                       fprintf(*script, "In database: %s\n", dbinfo->db_name);
+                       db_used = true;
+               }
+               fprintf(*script, "  (oid=%s) %s.%s\n",
+                               PQgetvalue(res, rowno, i_conoid),
+                               PQgetvalue(res, rowno, i_nspname),
+                               PQgetvalue(res, rowno, i_conname));
+       }
+}
+
 /*
  * Verify that no user-defined encoding conversions exist.
  */
 static void
 check_for_user_defined_encoding_conversions(ClusterInfo *cluster)
 {
-       int                     dbnum;
        FILE       *script = NULL;
        char            output_path[MAXPGPATH];
+       AsyncTask  *task = async_task_create();
 
        prep_status("Checking for user-defined encoding conversions");
 
@@ -1670,55 +1721,11 @@ check_for_user_defined_encoding_conversions(ClusterInfo 
*cluster)
                         log_opts.basedir,
                         "encoding_conversions.txt");
 
-       /* Find any user defined encoding conversions */
-       for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
-       {
-               PGresult   *res;
-               bool            db_used = false;
-               int                     ntups;
-               int                     rowno;
-               int                     i_conoid,
-                                       i_conname,
-                                       i_nspname;
-               DbInfo     *active_db = &cluster->dbarr.dbs[dbnum];
-               PGconn     *conn = connectToServer(cluster, active_db->db_name);
-
-               /*
-                * The query below hardcodes FirstNormalObjectId as 16384 
rather than
-                * interpolating that C #define into the query because, if that
-                * #define is ever changed, the cutoff we want to use is the 
value
-                * used by pre-version 14 servers, not that of some future 
version.
-                */
-               res = executeQueryOrDie(conn,
-                                                               "SELECT c.oid 
as conoid, c.conname, n.nspname "
-                                                               "FROM 
pg_catalog.pg_conversion c, "
-                                                               "     
pg_catalog.pg_namespace n "
-                                                               "WHERE 
c.connamespace = n.oid AND "
-                                                               "      c.oid >= 
16384");
-               ntups = PQntuples(res);
-               i_conoid = PQfnumber(res, "conoid");
-               i_conname = PQfnumber(res, "conname");
-               i_nspname = PQfnumber(res, "nspname");
-               for (rowno = 0; rowno < ntups; rowno++)
-               {
-                       if (script == NULL &&
-                               (script = fopen_priv(output_path, "w")) == NULL)
-                               pg_fatal("could not open file \"%s\": %m", 
output_path);
-                       if (!db_used)
-                       {
-                               fprintf(script, "In database: %s\n", 
active_db->db_name);
-                               db_used = true;
-                       }
-                       fprintf(script, "  (oid=%s) %s.%s\n",
-                                       PQgetvalue(res, rowno, i_conoid),
-                                       PQgetvalue(res, rowno, i_nspname),
-                                       PQgetvalue(res, rowno, i_conname));
-               }
-
-               PQclear(res);
-
-               PQfinish(conn);
-       }
+       async_task_add_step(task, user_defined_encoding_conversions_query,
+                                               
user_defined_encoding_conversions_process, true,
+                                               &script);
+       async_task_run(task, cluster);
+       async_task_free(task);
 
        if (script)
        {
-- 
2.39.3 (Apple Git-146)

Reply via email to