On Fri, Jul 19, 2024 at 04:21:37PM -0500, Nathan Bossart wrote: > However, while looking into this, I noticed that only one get_query > callback (get_db_subscription_count()) actually customizes the generated > query using information in the provided DbInfo. AFAICT we can do this > particular step without running a query in each database, as I mentioned > elsewhere [0]. That should speed things up a bit and allow us to simplify > the AsyncTask code. > > With that, if we are willing to assume that a given get_query callback will > generate the same string for all databases (and I think we should), we can > run the callback once and save the string in the step for dispatch_query() > to use. This would look more like what you suggested in the quoted text.
Here is a new patch set. I've included the latest revision of the patch to fix get_db_subscription_count() from the other thread [0] as 0001 since I expect that to be committed soon. I've also moved the patch that moves the "live_check" variable to "user_opts" to 0002 since I plan on committing that sooner than later, too. Otherwise, I've tried to address all feedback provided thus far. [0] https://commitfest.postgresql.org/49/5135/ -- nathan
>From 75b0c739f3710034ff8910176460c3bfbeb1716c Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nat...@postgresql.org> Date: Sat, 20 Jul 2024 21:01:29 -0500 Subject: [PATCH v5 01/13] pg_upgrade: retrieve subscription count more efficiently --- src/bin/pg_upgrade/check.c | 9 +++---- src/bin/pg_upgrade/info.c | 43 +++++++-------------------------- src/bin/pg_upgrade/pg_upgrade.h | 3 +-- 3 files changed, 13 insertions(+), 42 deletions(-) diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c index 27924159d6..39d14b7b92 100644 --- a/src/bin/pg_upgrade/check.c +++ b/src/bin/pg_upgrade/check.c @@ -1797,17 +1797,14 @@ check_new_cluster_subscription_configuration(void) { PGresult *res; PGconn *conn; - int nsubs_on_old; int max_replication_slots; /* Subscriptions and their dependencies can be migrated since PG17. */ if (GET_MAJOR_VERSION(old_cluster.major_version) < 1700) return; - nsubs_on_old = count_old_cluster_subscriptions(); - /* Quick return if there are no subscriptions to be migrated. */ - if (nsubs_on_old == 0) + if (old_cluster.nsubs == 0) return; prep_status("Checking for new cluster configuration for subscriptions"); @@ -1821,10 +1818,10 @@ check_new_cluster_subscription_configuration(void) pg_fatal("could not determine parameter settings on new cluster"); max_replication_slots = atoi(PQgetvalue(res, 0, 0)); - if (nsubs_on_old > max_replication_slots) + if (old_cluster.nsubs > max_replication_slots) pg_fatal("\"max_replication_slots\" (%d) must be greater than or equal to the number of " "subscriptions (%d) on the old cluster", - max_replication_slots, nsubs_on_old); + max_replication_slots, old_cluster.nsubs); PQclear(res); PQfinish(conn); diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c index 95c22a7200..e43be79aa5 100644 --- a/src/bin/pg_upgrade/info.c +++ b/src/bin/pg_upgrade/info.c @@ -28,7 +28,7 @@ static void print_db_infos(DbInfoArr *db_arr); static void print_rel_infos(RelInfoArr *rel_arr); static void print_slot_infos(LogicalSlotInfoArr *slot_arr); static void get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check); -static void get_db_subscription_count(DbInfo *dbinfo); +static void get_subscription_count(ClusterInfo *cluster); /* @@ -293,17 +293,13 @@ get_db_rel_and_slot_infos(ClusterInfo *cluster, bool live_check) get_rel_infos(cluster, pDbInfo); - /* - * Retrieve the logical replication slots infos and the subscriptions - * count for the old cluster. - */ if (cluster == &old_cluster) - { get_old_cluster_logical_slot_infos(pDbInfo, live_check); - get_db_subscription_count(pDbInfo); - } } + if (cluster == &old_cluster) + get_subscription_count(cluster); + if (cluster == &old_cluster) pg_log(PG_VERBOSE, "\nsource databases:"); else @@ -750,14 +746,14 @@ count_old_cluster_logical_slots(void) /* * get_db_subscription_count() * - * Gets the number of subscriptions in the database referred to by "dbinfo". + * Gets the number of subscriptions in the cluster. * * Note: This function will not do anything if the old cluster is pre-PG17. * This is because before that the logical slots are not upgraded, so we will * not be able to upgrade the logical replication clusters completely. */ static void -get_db_subscription_count(DbInfo *dbinfo) +get_subscription_count(ClusterInfo *cluster) { PGconn *conn; PGresult *res; @@ -766,36 +762,15 @@ get_db_subscription_count(DbInfo *dbinfo) if (GET_MAJOR_VERSION(old_cluster.major_version) < 1700) return; - conn = connectToServer(&old_cluster, dbinfo->db_name); + conn = connectToServer(&old_cluster, "template1"); res = executeQueryOrDie(conn, "SELECT count(*) " - "FROM pg_catalog.pg_subscription WHERE subdbid = %u", - dbinfo->db_oid); - dbinfo->nsubs = atoi(PQgetvalue(res, 0, 0)); + "FROM pg_catalog.pg_subscription"); + cluster->nsubs = atoi(PQgetvalue(res, 0, 0)); PQclear(res); PQfinish(conn); } -/* - * count_old_cluster_subscriptions() - * - * Returns the number of subscriptions for all databases. - * - * Note: this function always returns 0 if the old_cluster is PG16 and prior - * because we gather subscriptions only for cluster versions greater than or - * equal to PG17. See get_db_subscription_count(). - */ -int -count_old_cluster_subscriptions(void) -{ - int nsubs = 0; - - for (int dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++) - nsubs += old_cluster.dbarr.dbs[dbnum].nsubs; - - return nsubs; -} - static void free_db_and_rel_infos(DbInfoArr *db_arr) { diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h index 8afe240bdf..e6dbbe6a93 100644 --- a/src/bin/pg_upgrade/pg_upgrade.h +++ b/src/bin/pg_upgrade/pg_upgrade.h @@ -197,7 +197,6 @@ typedef struct * path */ RelInfoArr rel_arr; /* array of all user relinfos */ LogicalSlotInfoArr slot_arr; /* array of all LogicalSlotInfo */ - int nsubs; /* number of subscriptions */ } DbInfo; /* @@ -296,6 +295,7 @@ typedef struct char major_version_str[64]; /* string PG_VERSION of cluster */ uint32 bin_version; /* version returned from pg_ctl */ const char *tablespace_suffix; /* directory specification */ + int nsubs; /* number of subscriptions */ } ClusterInfo; @@ -430,7 +430,6 @@ FileNameMap *gen_db_file_maps(DbInfo *old_db, const char *new_pgdata); void get_db_rel_and_slot_infos(ClusterInfo *cluster, bool live_check); int count_old_cluster_logical_slots(void); -int count_old_cluster_subscriptions(void); /* option.c */ -- 2.39.3 (Apple Git-146)
>From d727dad6968d58464bf8044901badf024ade795e Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nat...@postgresql.org> Date: Tue, 21 May 2024 16:35:19 -0500 Subject: [PATCH v5 02/13] move live_check variable to user_opts --- src/bin/pg_upgrade/check.c | 32 ++++++++++++++++---------------- src/bin/pg_upgrade/controldata.c | 5 +++-- src/bin/pg_upgrade/info.c | 12 +++++------- src/bin/pg_upgrade/option.c | 4 ++-- src/bin/pg_upgrade/pg_upgrade.c | 21 ++++++++++----------- src/bin/pg_upgrade/pg_upgrade.h | 13 +++++++------ 6 files changed, 43 insertions(+), 44 deletions(-) diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c index 39d14b7b92..d8888f5053 100644 --- a/src/bin/pg_upgrade/check.c +++ b/src/bin/pg_upgrade/check.c @@ -29,7 +29,7 @@ static void check_for_new_tablespace_dir(void); static void check_for_user_defined_encoding_conversions(ClusterInfo *cluster); static void check_new_cluster_logical_replication_slots(void); static void check_new_cluster_subscription_configuration(void); -static void check_old_cluster_for_valid_slots(bool live_check); +static void check_old_cluster_for_valid_slots(void); static void check_old_cluster_subscription_state(void); /* @@ -555,9 +555,9 @@ fix_path_separator(char *path) } void -output_check_banner(bool live_check) +output_check_banner(void) { - if (user_opts.check && live_check) + if (user_opts.live_check) { pg_log(PG_REPORT, "Performing Consistency Checks on Old Live Server\n" @@ -573,18 +573,18 @@ output_check_banner(bool live_check) void -check_and_dump_old_cluster(bool live_check) +check_and_dump_old_cluster(void) { /* -- OLD -- */ - if (!live_check) + if (!user_opts.live_check) start_postmaster(&old_cluster, true); /* * Extract a list of databases, tables, and logical replication slots from * the old cluster. */ - get_db_rel_and_slot_infos(&old_cluster, live_check); + get_db_rel_and_slot_infos(&old_cluster); init_tablespaces(); @@ -605,7 +605,7 @@ check_and_dump_old_cluster(bool live_check) * Logical replication slots can be migrated since PG17. See comments * atop get_old_cluster_logical_slot_infos(). */ - check_old_cluster_for_valid_slots(live_check); + check_old_cluster_for_valid_slots(); /* * Subscriptions and their dependencies can be migrated since PG17. @@ -652,7 +652,7 @@ check_and_dump_old_cluster(bool live_check) */ if (GET_MAJOR_VERSION(old_cluster.major_version) <= 906) { - if (user_opts.check) + if (user_opts.live_check) old_9_6_invalidate_hash_indexes(&old_cluster, true); } @@ -667,7 +667,7 @@ check_and_dump_old_cluster(bool live_check) if (!user_opts.check) generate_old_dump(); - if (!live_check) + if (!user_opts.live_check) stop_postmaster(false); } @@ -675,7 +675,7 @@ check_and_dump_old_cluster(bool live_check) void check_new_cluster(void) { - get_db_rel_and_slot_infos(&new_cluster, false); + get_db_rel_and_slot_infos(&new_cluster); check_new_cluster_is_empty(); @@ -826,14 +826,14 @@ check_cluster_versions(void) void -check_cluster_compatibility(bool live_check) +check_cluster_compatibility(void) { /* get/check pg_control data of servers */ - get_control_data(&old_cluster, live_check); - get_control_data(&new_cluster, false); + get_control_data(&old_cluster); + get_control_data(&new_cluster); check_control_data(&old_cluster.controldata, &new_cluster.controldata); - if (live_check && old_cluster.port == new_cluster.port) + if (user_opts.live_check && old_cluster.port == new_cluster.port) pg_fatal("When checking a live server, " "the old and new port numbers must be different."); } @@ -1836,7 +1836,7 @@ check_new_cluster_subscription_configuration(void) * before shutdown. */ static void -check_old_cluster_for_valid_slots(bool live_check) +check_old_cluster_for_valid_slots(void) { char output_path[MAXPGPATH]; FILE *script = NULL; @@ -1875,7 +1875,7 @@ check_old_cluster_for_valid_slots(bool live_check) * Note: This can be satisfied only when the old cluster has been * shut down, so we skip this for live checks. */ - if (!live_check && !slot->caught_up) + if (!user_opts.live_check && !slot->caught_up) { if (script == NULL && (script = fopen_priv(output_path, "w")) == NULL) diff --git a/src/bin/pg_upgrade/controldata.c b/src/bin/pg_upgrade/controldata.c index 1f0ccea3ed..cf665b9dee 100644 --- a/src/bin/pg_upgrade/controldata.c +++ b/src/bin/pg_upgrade/controldata.c @@ -33,7 +33,7 @@ * return valid xid data for a running server. */ void -get_control_data(ClusterInfo *cluster, bool live_check) +get_control_data(ClusterInfo *cluster) { char cmd[MAXPGPATH]; char bufin[MAX_STRING]; @@ -76,6 +76,7 @@ get_control_data(ClusterInfo *cluster, bool live_check) uint32 segno = 0; char *resetwal_bin; int rc; + bool live_check = (cluster == &old_cluster && user_opts.live_check); /* * Because we test the pg_resetwal output as strings, it has to be in @@ -118,7 +119,7 @@ get_control_data(ClusterInfo *cluster, bool live_check) /* * Check for clean shutdown */ - if (!live_check || cluster == &new_cluster) + if (!live_check) { /* only pg_controldata outputs the cluster state */ snprintf(cmd, sizeof(cmd), "\"%s/pg_controldata\" \"%s\"", diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c index e43be79aa5..fc94de9ea4 100644 --- a/src/bin/pg_upgrade/info.c +++ b/src/bin/pg_upgrade/info.c @@ -27,7 +27,7 @@ static void free_rel_infos(RelInfoArr *rel_arr); static void print_db_infos(DbInfoArr *db_arr); static void print_rel_infos(RelInfoArr *rel_arr); static void print_slot_infos(LogicalSlotInfoArr *slot_arr); -static void get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check); +static void get_old_cluster_logical_slot_infos(DbInfo *dbinfo); static void get_subscription_count(ClusterInfo *cluster); @@ -273,11 +273,9 @@ report_unmatched_relation(const RelInfo *rel, const DbInfo *db, bool is_new_db) * * higher level routine to generate dbinfos for the database running * on the given "port". Assumes that server is already running. - * - * live_check would be used only when the target is the old cluster. */ void -get_db_rel_and_slot_infos(ClusterInfo *cluster, bool live_check) +get_db_rel_and_slot_infos(ClusterInfo *cluster) { int dbnum; @@ -294,7 +292,7 @@ get_db_rel_and_slot_infos(ClusterInfo *cluster, bool live_check) get_rel_infos(cluster, pDbInfo); if (cluster == &old_cluster) - get_old_cluster_logical_slot_infos(pDbInfo, live_check); + get_old_cluster_logical_slot_infos(pDbInfo); } if (cluster == &old_cluster) @@ -641,7 +639,7 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo) * are included. */ static void -get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check) +get_old_cluster_logical_slot_infos(DbInfo *dbinfo) { PGconn *conn; PGresult *res; @@ -677,7 +675,7 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check) "WHERE slot_type = 'logical' AND " "database = current_database() AND " "temporary IS FALSE;", - live_check ? "FALSE" : + user_opts.live_check ? "FALSE" : "(CASE WHEN invalidation_reason IS NOT NULL THEN FALSE " "ELSE (SELECT pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name)) " "END)"); diff --git a/src/bin/pg_upgrade/option.c b/src/bin/pg_upgrade/option.c index 548ea4e623..6f41d63eed 100644 --- a/src/bin/pg_upgrade/option.c +++ b/src/bin/pg_upgrade/option.c @@ -470,10 +470,10 @@ adjust_data_dir(ClusterInfo *cluster) * directory. */ void -get_sock_dir(ClusterInfo *cluster, bool live_check) +get_sock_dir(ClusterInfo *cluster) { #if !defined(WIN32) - if (!live_check) + if (!user_opts.live_check || cluster == &new_cluster) cluster->sockdir = user_opts.socketdir; else { diff --git a/src/bin/pg_upgrade/pg_upgrade.c b/src/bin/pg_upgrade/pg_upgrade.c index 03eb738fd7..99f3d4543e 100644 --- a/src/bin/pg_upgrade/pg_upgrade.c +++ b/src/bin/pg_upgrade/pg_upgrade.c @@ -65,7 +65,7 @@ static void create_new_objects(void); static void copy_xact_xlog_xid(void); static void set_frozenxids(bool minmxid_only); static void make_outputdirs(char *pgdata); -static void setup(char *argv0, bool *live_check); +static void setup(char *argv0); static void create_logical_replication_slots(void); ClusterInfo old_cluster, @@ -88,7 +88,6 @@ int main(int argc, char **argv) { char *deletion_script_file_name = NULL; - bool live_check = false; /* * pg_upgrade doesn't currently use common/logging.c, but initialize it @@ -123,18 +122,18 @@ main(int argc, char **argv) */ make_outputdirs(new_cluster.pgdata); - setup(argv[0], &live_check); + setup(argv[0]); - output_check_banner(live_check); + output_check_banner(); check_cluster_versions(); - get_sock_dir(&old_cluster, live_check); - get_sock_dir(&new_cluster, false); + get_sock_dir(&old_cluster); + get_sock_dir(&new_cluster); - check_cluster_compatibility(live_check); + check_cluster_compatibility(); - check_and_dump_old_cluster(live_check); + check_and_dump_old_cluster(); /* -- NEW -- */ @@ -331,7 +330,7 @@ make_outputdirs(char *pgdata) static void -setup(char *argv0, bool *live_check) +setup(char *argv0) { /* * make sure the user has a clean environment, otherwise, we may confuse @@ -378,7 +377,7 @@ setup(char *argv0, bool *live_check) pg_fatal("There seems to be a postmaster servicing the old cluster.\n" "Please shutdown that postmaster and try again."); else - *live_check = true; + user_opts.live_check = true; } } @@ -660,7 +659,7 @@ create_new_objects(void) set_frozenxids(true); /* update new_cluster info now that we have objects in the databases */ - get_db_rel_and_slot_infos(&new_cluster, false); + get_db_rel_and_slot_infos(&new_cluster); } /* diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h index e6dbbe6a93..08b2ad26d7 100644 --- a/src/bin/pg_upgrade/pg_upgrade.h +++ b/src/bin/pg_upgrade/pg_upgrade.h @@ -322,6 +322,7 @@ typedef struct typedef struct { bool check; /* check clusters only, don't change any data */ + bool live_check; /* check clusters only, old server is running */ bool do_sync; /* flush changes to disk */ transferMode transfer_mode; /* copy files or link them? */ int jobs; /* number of processes/threads to use */ @@ -366,20 +367,20 @@ extern OSInfo os_info; /* check.c */ -void output_check_banner(bool live_check); -void check_and_dump_old_cluster(bool live_check); +void output_check_banner(void); +void check_and_dump_old_cluster(void); void check_new_cluster(void); void report_clusters_compatible(void); void issue_warnings_and_set_wal_level(void); void output_completion_banner(char *deletion_script_file_name); void check_cluster_versions(void); -void check_cluster_compatibility(bool live_check); +void check_cluster_compatibility(void); void create_script_for_old_cluster_deletion(char **deletion_script_file_name); /* controldata.c */ -void get_control_data(ClusterInfo *cluster, bool live_check); +void get_control_data(ClusterInfo *cluster); void check_control_data(ControlData *oldctrl, ControlData *newctrl); void disable_old_cluster(void); @@ -428,14 +429,14 @@ void check_loadable_libraries(void); FileNameMap *gen_db_file_maps(DbInfo *old_db, DbInfo *new_db, int *nmaps, const char *old_pgdata, const char *new_pgdata); -void get_db_rel_and_slot_infos(ClusterInfo *cluster, bool live_check); +void get_db_rel_and_slot_infos(ClusterInfo *cluster); int count_old_cluster_logical_slots(void); /* option.c */ void parseCommandLine(int argc, char *argv[]); void adjust_data_dir(ClusterInfo *cluster); -void get_sock_dir(ClusterInfo *cluster, bool live_check); +void get_sock_dir(ClusterInfo *cluster); /* relfilenumber.c */ -- 2.39.3 (Apple Git-146)
>From c955d1d1d769f8864a43cf98dc9dbacf5e6c7f9c Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nat...@postgresql.org> Date: Fri, 28 Jun 2024 11:02:44 -0500 Subject: [PATCH v5 03/13] introduce framework for parallelizing pg_upgrade tasks --- src/bin/pg_upgrade/Makefile | 1 + src/bin/pg_upgrade/async.c | 327 +++++++++++++++++++++++++++++++ src/bin/pg_upgrade/meson.build | 1 + src/bin/pg_upgrade/pg_upgrade.h | 16 ++ src/tools/pgindent/typedefs.list | 4 + 5 files changed, 349 insertions(+) create mode 100644 src/bin/pg_upgrade/async.c diff --git a/src/bin/pg_upgrade/Makefile b/src/bin/pg_upgrade/Makefile index bde91e2beb..3bc4f5d740 100644 --- a/src/bin/pg_upgrade/Makefile +++ b/src/bin/pg_upgrade/Makefile @@ -12,6 +12,7 @@ include $(top_builddir)/src/Makefile.global OBJS = \ $(WIN32RES) \ + async.o \ check.o \ controldata.o \ dump.o \ diff --git a/src/bin/pg_upgrade/async.c b/src/bin/pg_upgrade/async.c new file mode 100644 index 0000000000..742e9df37c --- /dev/null +++ b/src/bin/pg_upgrade/async.c @@ -0,0 +1,327 @@ +/* + * async.c + * + * parallelization via libpq's async APIs + * + * Copyright (c) 2024, PostgreSQL Global Development Group + * src/bin/pg_upgrade/async.c + */ + +#include "postgres_fe.h" + +#include "common/connect.h" +#include "fe_utils/string_utils.h" +#include "pg_upgrade.h" + +static int dbs_complete; +static int dbs_processing; + +typedef struct AsyncTaskCallbacks +{ + AsyncTaskGetQueryCB query_cb; + AsyncTaskProcessCB process_cb; + char *query; + bool free_result; + void *arg; +} AsyncTaskCallbacks; + +typedef struct AsyncTask +{ + AsyncTaskCallbacks *cbs; + int num_cb_sets; +} AsyncTask; + +typedef enum +{ + FREE, + CONNECTING, + SETTING_SEARCH_PATH, + RUNNING_QUERY, +} AsyncSlotState; + +typedef struct +{ + AsyncSlotState state; + int db; + int query; + PGconn *conn; +} AsyncSlot; + +AsyncTask * +async_task_create(void) +{ + return pg_malloc0(sizeof(AsyncTask)); +} + +void +async_task_free(AsyncTask *task) +{ + for (int i = 0; i < task->num_cb_sets; i++) + { + if (task->cbs[i].query) + pg_free(task->cbs[i].query); + } + + if (task->cbs) + pg_free(task->cbs); + + pg_free(task); +} + +void +async_task_add_step(AsyncTask *task, + AsyncTaskGetQueryCB query_cb, + AsyncTaskProcessCB process_cb, bool free_result, + void *arg) +{ + AsyncTaskCallbacks *new_cbs; + + task->cbs = pg_realloc(task->cbs, + ++task->num_cb_sets * sizeof(AsyncTaskCallbacks)); + + new_cbs = &task->cbs[task->num_cb_sets - 1]; + new_cbs->query_cb = query_cb; + new_cbs->process_cb = process_cb; + new_cbs->query = NULL; + new_cbs->free_result = free_result; + new_cbs->arg = arg; +} + +static void +conn_failure(PGconn *conn) +{ + pg_fatal("connection failure: %s", PQerrorMessage(conn)); +} + +static void +start_conn(const ClusterInfo *cluster, AsyncSlot *slot) +{ + PQExpBufferData conn_opts; + + /* Build connection string with proper quoting */ + initPQExpBuffer(&conn_opts); + appendPQExpBufferStr(&conn_opts, "dbname="); + appendConnStrVal(&conn_opts, cluster->dbarr.dbs[slot->db].db_name); + appendPQExpBufferStr(&conn_opts, " user="); + appendConnStrVal(&conn_opts, os_info.user); + appendPQExpBuffer(&conn_opts, " port=%d", cluster->port); + if (cluster->sockdir) + { + appendPQExpBufferStr(&conn_opts, " host="); + appendConnStrVal(&conn_opts, cluster->sockdir); + } + + slot->conn = PQconnectStart(conn_opts.data); + termPQExpBuffer(&conn_opts); + + if (!slot->conn) + conn_failure(slot->conn); +} + +static void +dispatch_query(const ClusterInfo *cluster, AsyncSlot *slot, + const AsyncTask *task) +{ + AsyncTaskCallbacks *cbs = &task->cbs[slot->query]; + + if (!cbs->query) + cbs->query = cbs->query_cb(cbs->arg); + + if (!PQsendQuery(slot->conn, cbs->query)) + conn_failure(slot->conn); +} + +static PGresult * +get_last_result(PGconn *conn) +{ + PGresult *tmp; + PGresult *res = NULL; + + while ((tmp = PQgetResult(conn)) != NULL) + { + PQclear(res); + res = tmp; + if (PQstatus(conn) == CONNECTION_BAD) + conn_failure(conn); + } + + if (PQresultStatus(res) != PGRES_COMMAND_OK && + PQresultStatus(res) != PGRES_TUPLES_OK) + conn_failure(conn); + + return res; +} + +static void +process_query_result(const ClusterInfo *cluster, AsyncSlot *slot, + const AsyncTask *task) +{ + AsyncTaskCallbacks *cbs = &task->cbs[slot->query]; + AsyncTaskProcessCB process_cb = cbs->process_cb; + DbInfo *dbinfo = &cluster->dbarr.dbs[slot->db]; + PGresult *res = get_last_result(slot->conn); + + (*process_cb) (dbinfo, res, cbs->arg); + + if (cbs->free_result) + PQclear(res); +} + +static void +process_slot(const ClusterInfo *cluster, AsyncSlot *slot, const AsyncTask *task) +{ + switch (slot->state) + { + case FREE: + if (dbs_processing >= cluster->dbarr.ndbs) + return; + slot->db = dbs_processing++; + slot->state = CONNECTING; + start_conn(cluster, slot); + return; + + case CONNECTING: + if (PQconnectPoll(slot->conn) == PGRES_POLLING_FAILED) + conn_failure(slot->conn); + if (PQconnectPoll(slot->conn) != PGRES_POLLING_OK) + return; + slot->state = SETTING_SEARCH_PATH; + if (!PQsendQuery(slot->conn, ALWAYS_SECURE_SEARCH_PATH_SQL)) + conn_failure(slot->conn); + return; + + case SETTING_SEARCH_PATH: + if (!PQconsumeInput(slot->conn)) + conn_failure(slot->conn); + if (PQisBusy(slot->conn)) + return; + PQclear(get_last_result(slot->conn)); + slot->state = RUNNING_QUERY; + dispatch_query(cluster, slot, task); + return; + + case RUNNING_QUERY: + if (!PQconsumeInput(slot->conn)) + conn_failure(slot->conn); + if (PQisBusy(slot->conn)) + return; + process_query_result(cluster, slot, task); + if (++slot->query >= task->num_cb_sets) + { + dbs_complete++; + PQfinish(slot->conn); + memset(slot, 0, sizeof(AsyncSlot)); + return; + } + dispatch_query(cluster, slot, task); + return; + } +} + +/* + * Wait on the slots to either finish connecting or to receive query results if + * possible. This avoids a tight loop in async_task_run(). + */ +static void +wait_on_slots(AsyncSlot *slots, int numslots) +{ + fd_set input_mask; + fd_set output_mask; + fd_set except_mask; + int maxFd = 0; + + FD_ZERO(&input_mask); + FD_ZERO(&output_mask); + FD_ZERO(&except_mask); + + for (int i = 0; i < numslots; i++) + { + int sock; + bool read = false; + + switch (slots[i].state) + { + case FREE: + + /* + * If we see a free slot, return right away so that it can be + * reused immediately for the next database. This might cause + * us to spin more than necessary as we finish processing the + * last few databases, but that shouldn't cause too much harm. + */ + return; + + case CONNECTING: + + /* + * If we are waiting for the connection to establish, choose + * whether to wait for reading or for writing on the socket as + * appropriate. If neither apply, just return immediately so + * that we can handle the slot. + */ + { + PostgresPollingStatusType status; + + status = PQconnectPoll(slots[i].conn); + if (status == PGRES_POLLING_READING) + read = true; + else if (status != PGRES_POLLING_WRITING) + return; + } + break; + + case SETTING_SEARCH_PATH: + case RUNNING_QUERY: + + /* + * If we've sent a query, we must wait for the socket to be + * read-ready. Note that process_slot() handles calling + * PQconsumeInput() as required. + */ + read = true; + break; + } + + /* + * If there's some problem retrieving the socket, just pretend this + * slot doesn't exist. We don't expect this to happen regularly in + * practice, so it seems unlikely to cause too much harm. + */ + sock = PQsocket(slots[i].conn); + if (sock < 0) + continue; + + /* + * Add the socket to the set. + */ + FD_SET(sock, read ? &input_mask : &output_mask); + FD_SET(sock, &except_mask); + maxFd = Max(maxFd, sock); + } + + /* + * If we found socket(s) to wait on, wait. + */ + if (maxFd != 0) + (void) select(maxFd + 1, &input_mask, &output_mask, &except_mask, NULL); +} + +void +async_task_run(const AsyncTask *task, const ClusterInfo *cluster) +{ + int jobs = Max(1, user_opts.jobs); + AsyncSlot *slots = pg_malloc0(sizeof(AsyncSlot) * jobs); + + dbs_complete = 0; + dbs_processing = 0; + + while (dbs_complete < cluster->dbarr.ndbs) + { + for (int i = 0; i < jobs; i++) + process_slot(cluster, &slots[i], task); + + wait_on_slots(slots, jobs); + } + + pg_free(slots); +} diff --git a/src/bin/pg_upgrade/meson.build b/src/bin/pg_upgrade/meson.build index 9825fa3305..9eb48e176c 100644 --- a/src/bin/pg_upgrade/meson.build +++ b/src/bin/pg_upgrade/meson.build @@ -1,6 +1,7 @@ # Copyright (c) 2022-2024, PostgreSQL Global Development Group pg_upgrade_sources = files( + 'async.c', 'check.c', 'controldata.c', 'dump.c', diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h index 08b2ad26d7..dea418ec3f 100644 --- a/src/bin/pg_upgrade/pg_upgrade.h +++ b/src/bin/pg_upgrade/pg_upgrade.h @@ -494,3 +494,19 @@ void parallel_transfer_all_new_dbs(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr char *old_pgdata, char *new_pgdata, char *old_tablespace); bool reap_child(bool wait_for_child); + +/* async.c */ + +typedef char *(*AsyncTaskGetQueryCB) (void *arg); +typedef void (*AsyncTaskProcessCB) (DbInfo *dbinfo, PGresult *res, void *arg); + +/* struct definition is private to async.c */ +typedef struct AsyncTask AsyncTask; + +AsyncTask *async_task_create(void); +void async_task_add_step(AsyncTask *task, + AsyncTaskGetQueryCB query_cb, + AsyncTaskProcessCB process_cb, bool free_result, + void *arg); +void async_task_run(const AsyncTask *task, const ClusterInfo *cluster); +void async_task_free(AsyncTask *task); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index b4d7f9217c..a2330f4346 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -153,6 +153,10 @@ ArrayMetaState ArraySubWorkspace ArrayToken ArrayType +AsyncSlot +AsyncSlotState +AsyncTask +AsyncTaskCallbacks AsyncQueueControl AsyncQueueEntry AsyncRequest -- 2.39.3 (Apple Git-146)
>From 917a33304f1eb20065351075f487d2deb4f8e976 Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nat...@postgresql.org> Date: Fri, 28 Jun 2024 17:21:19 -0500 Subject: [PATCH v5 04/13] use new pg_upgrade async API for subscription state checks --- src/bin/pg_upgrade/check.c | 204 ++++++++++++++++++++----------------- 1 file changed, 110 insertions(+), 94 deletions(-) diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c index d8888f5053..41a23e7efe 100644 --- a/src/bin/pg_upgrade/check.c +++ b/src/bin/pg_upgrade/check.c @@ -1903,6 +1903,79 @@ check_old_cluster_for_valid_slots(void) check_ok(); } +/* private state for subscription state checks */ +struct substate_info +{ + FILE *script; + char output_path[MAXPGPATH]; +}; + +/* + * We don't allow upgrade if there is a risk of dangling slot or origin + * corresponding to initial sync after upgrade. + * + * A slot/origin not created yet refers to the 'i' (initialize) state, while + * 'r' (ready) state refers to a slot/origin created previously but already + * dropped. These states are supported for pg_upgrade. The other states listed + * below are not supported: + * + * a) SUBREL_STATE_DATASYNC: A relation upgraded while in this state would + * retain a replication slot, which could not be dropped by the sync worker + * spawned after the upgrade because the subscription ID used for the slot name + * won't match anymore. + * + * b) SUBREL_STATE_SYNCDONE: A relation upgraded while in this state would + * retain the replication origin when there is a failure in tablesync worker + * immediately after dropping the replication slot in the publisher. + * + * c) SUBREL_STATE_FINISHEDCOPY: A tablesync worker spawned to work on a + * relation upgraded while in this state would expect an origin ID with the OID + * of the subscription used before the upgrade, causing it to fail. + * + * d) SUBREL_STATE_SYNCWAIT, SUBREL_STATE_CATCHUP and SUBREL_STATE_UNKNOWN: + * These states are not stored in the catalog, so we need not allow these + * states. + */ +static char * +sub_query(void *arg) +{ + return pg_strdup("SELECT r.srsubstate, s.subname, n.nspname, c.relname " + "FROM pg_catalog.pg_subscription_rel r " + "LEFT JOIN pg_catalog.pg_subscription s" + " ON r.srsubid = s.oid " + "LEFT JOIN pg_catalog.pg_class c" + " ON r.srrelid = c.oid " + "LEFT JOIN pg_catalog.pg_namespace n" + " ON c.relnamespace = n.oid " + "WHERE r.srsubstate NOT IN ('i', 'r') " + "ORDER BY s.subname"); +} + +static void +sub_process(DbInfo *dbinfo, PGresult *res, void *arg) +{ + struct substate_info *state = (struct substate_info *) arg; + int ntup = PQntuples(res); + int i_srsubstate = PQfnumber(res, "srsubstate"); + int i_subname = PQfnumber(res, "subname"); + int i_nspname = PQfnumber(res, "nspname"); + int i_relname = PQfnumber(res, "relname"); + + for (int i = 0; i < ntup; i++) + { + if (state->script == NULL && + (state->script = fopen_priv(state->output_path, "w")) == NULL) + pg_fatal("could not open file \"%s\": %m", state->output_path); + + fprintf(state->script, "The table sync state \"%s\" is not allowed for database:\"%s\" subscription:\"%s\" schema:\"%s\" relation:\"%s\"\n", + PQgetvalue(res, i, i_srsubstate), + dbinfo->db_name, + PQgetvalue(res, i, i_subname), + PQgetvalue(res, i, i_nspname), + PQgetvalue(res, i, i_relname)); + } +} + /* * check_old_cluster_subscription_state() * @@ -1913,115 +1986,58 @@ check_old_cluster_for_valid_slots(void) static void check_old_cluster_subscription_state(void) { - FILE *script = NULL; - char output_path[MAXPGPATH]; + AsyncTask *task = async_task_create(); + struct substate_info state; + PGresult *res; + PGconn *conn; int ntup; prep_status("Checking for subscription state"); - snprintf(output_path, sizeof(output_path), "%s/%s", + state.script = NULL; + snprintf(state.output_path, sizeof(state.output_path), "%s/%s", log_opts.basedir, "subs_invalid.txt"); - for (int dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++) - { - PGresult *res; - DbInfo *active_db = &old_cluster.dbarr.dbs[dbnum]; - PGconn *conn = connectToServer(&old_cluster, active_db->db_name); - - /* We need to check for pg_replication_origin only once. */ - if (dbnum == 0) - { - /* - * Check that all the subscriptions have their respective - * replication origin. - */ - res = executeQueryOrDie(conn, - "SELECT d.datname, s.subname " - "FROM pg_catalog.pg_subscription s " - "LEFT OUTER JOIN pg_catalog.pg_replication_origin o " - " ON o.roname = 'pg_' || s.oid " - "INNER JOIN pg_catalog.pg_database d " - " ON d.oid = s.subdbid " - "WHERE o.roname IS NULL;"); - - ntup = PQntuples(res); - for (int i = 0; i < ntup; i++) - { - if (script == NULL && (script = fopen_priv(output_path, "w")) == NULL) - pg_fatal("could not open file \"%s\": %m", output_path); - fprintf(script, "The replication origin is missing for database:\"%s\" subscription:\"%s\"\n", - PQgetvalue(res, i, 0), - PQgetvalue(res, i, 1)); - } - PQclear(res); - } - /* - * We don't allow upgrade if there is a risk of dangling slot or - * origin corresponding to initial sync after upgrade. - * - * A slot/origin not created yet refers to the 'i' (initialize) state, - * while 'r' (ready) state refers to a slot/origin created previously - * but already dropped. These states are supported for pg_upgrade. The - * other states listed below are not supported: - * - * a) SUBREL_STATE_DATASYNC: A relation upgraded while in this state - * would retain a replication slot, which could not be dropped by the - * sync worker spawned after the upgrade because the subscription ID - * used for the slot name won't match anymore. - * - * b) SUBREL_STATE_SYNCDONE: A relation upgraded while in this state - * would retain the replication origin when there is a failure in - * tablesync worker immediately after dropping the replication slot in - * the publisher. - * - * c) SUBREL_STATE_FINISHEDCOPY: A tablesync worker spawned to work on - * a relation upgraded while in this state would expect an origin ID - * with the OID of the subscription used before the upgrade, causing - * it to fail. - * - * d) SUBREL_STATE_SYNCWAIT, SUBREL_STATE_CATCHUP and - * SUBREL_STATE_UNKNOWN: These states are not stored in the catalog, - * so we need not allow these states. - */ - res = executeQueryOrDie(conn, - "SELECT r.srsubstate, s.subname, n.nspname, c.relname " - "FROM pg_catalog.pg_subscription_rel r " - "LEFT JOIN pg_catalog.pg_subscription s" - " ON r.srsubid = s.oid " - "LEFT JOIN pg_catalog.pg_class c" - " ON r.srrelid = c.oid " - "LEFT JOIN pg_catalog.pg_namespace n" - " ON c.relnamespace = n.oid " - "WHERE r.srsubstate NOT IN ('i', 'r') " - "ORDER BY s.subname"); - - ntup = PQntuples(res); - for (int i = 0; i < ntup; i++) - { - if (script == NULL && (script = fopen_priv(output_path, "w")) == NULL) - pg_fatal("could not open file \"%s\": %m", output_path); + /* + * Check that all the subscriptions have their respective replication + * origin. This check only needs to run once. + */ + conn = connectToServer(&old_cluster, old_cluster.dbarr.dbs[0].db_name); + res = executeQueryOrDie(conn, + "SELECT d.datname, s.subname " + "FROM pg_catalog.pg_subscription s " + "LEFT OUTER JOIN pg_catalog.pg_replication_origin o " + " ON o.roname = 'pg_' || s.oid " + "INNER JOIN pg_catalog.pg_database d " + " ON d.oid = s.subdbid " + "WHERE o.roname IS NULL;"); + ntup = PQntuples(res); + for (int i = 0; i < ntup; i++) + { + if (state.script == NULL && + (state.script = fopen_priv(state.output_path, "w")) == NULL) + pg_fatal("could not open file \"%s\": %m", state.output_path); + fprintf(state.script, "The replication origin is missing for database:\"%s\" subscription:\"%s\"\n", + PQgetvalue(res, i, 0), + PQgetvalue(res, i, 1)); + } + PQclear(res); + PQfinish(conn); - fprintf(script, "The table sync state \"%s\" is not allowed for database:\"%s\" subscription:\"%s\" schema:\"%s\" relation:\"%s\"\n", - PQgetvalue(res, i, 0), - active_db->db_name, - PQgetvalue(res, i, 1), - PQgetvalue(res, i, 2), - PQgetvalue(res, i, 3)); - } + async_task_add_step(task, sub_query, sub_process, true, &state); - PQclear(res); - PQfinish(conn); - } + async_task_run(task, &old_cluster); + async_task_free(task); - if (script) + if (state.script) { - fclose(script); + fclose(state.script); pg_log(PG_REPORT, "fatal"); pg_fatal("Your installation contains subscriptions without origin or having relations not in i (initialize) or r (ready) state.\n" "You can allow the initial sync to finish for all relations and then restart the upgrade.\n" "A list of the problematic subscriptions is in the file:\n" - " %s", output_path); + " %s", state.output_path); } else check_ok(); -- 2.39.3 (Apple Git-146)
>From e62e03300dc2e207c9e0e22098fea6d93c9a909e Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nat...@postgresql.org> Date: Fri, 28 Jun 2024 21:09:33 -0500 Subject: [PATCH v5 05/13] use new pg_upgrade async API for retrieving relinfos --- src/bin/pg_upgrade/info.c | 238 ++++++++++++++++++-------------------- 1 file changed, 110 insertions(+), 128 deletions(-) diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c index fc94de9ea4..798d96ef43 100644 --- a/src/bin/pg_upgrade/info.c +++ b/src/bin/pg_upgrade/info.c @@ -11,6 +11,7 @@ #include "access/transam.h" #include "catalog/pg_class_d.h" +#include "pqexpbuffer.h" #include "pg_upgrade.h" static void create_rel_filename_map(const char *old_data, const char *new_data, @@ -22,13 +23,15 @@ static void report_unmatched_relation(const RelInfo *rel, const DbInfo *db, static void free_db_and_rel_infos(DbInfoArr *db_arr); static void get_template0_info(ClusterInfo *cluster); static void get_db_infos(ClusterInfo *cluster); -static void get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo); +static char *get_rel_infos_query(void *arg); +static void get_rel_infos_result(DbInfo *dbinfo, PGresult *res, void *arg); static void free_rel_infos(RelInfoArr *rel_arr); static void print_db_infos(DbInfoArr *db_arr); static void print_rel_infos(RelInfoArr *rel_arr); static void print_slot_infos(LogicalSlotInfoArr *slot_arr); -static void get_old_cluster_logical_slot_infos(DbInfo *dbinfo); static void get_subscription_count(ClusterInfo *cluster); +static char *get_old_cluster_logical_slot_infos_query(void *arg); +static void get_old_cluster_logical_slot_infos_result(DbInfo *dbinfo, PGresult *res, void *arg); /* @@ -277,7 +280,7 @@ report_unmatched_relation(const RelInfo *rel, const DbInfo *db, bool is_new_db) void get_db_rel_and_slot_infos(ClusterInfo *cluster) { - int dbnum; + AsyncTask *task = async_task_create(); if (cluster->dbarr.dbs != NULL) free_db_and_rel_infos(&cluster->dbarr); @@ -285,15 +288,19 @@ get_db_rel_and_slot_infos(ClusterInfo *cluster) get_template0_info(cluster); get_db_infos(cluster); - for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++) - { - DbInfo *pDbInfo = &cluster->dbarr.dbs[dbnum]; - - get_rel_infos(cluster, pDbInfo); + async_task_add_step(task, + get_rel_infos_query, + get_rel_infos_result, + true, NULL); + if (cluster == &old_cluster && + GET_MAJOR_VERSION(cluster->major_version) > 1600) + async_task_add_step(task, + get_old_cluster_logical_slot_infos_query, + get_old_cluster_logical_slot_infos_result, + true, cluster); - if (cluster == &old_cluster) - get_old_cluster_logical_slot_infos(pDbInfo); - } + async_task_run(task, cluster); + async_task_free(task); if (cluster == &old_cluster) get_subscription_count(cluster); @@ -443,32 +450,12 @@ get_db_infos(ClusterInfo *cluster) * Note: the resulting RelInfo array is assumed to be sorted by OID. * This allows later processing to match up old and new databases efficiently. */ -static void -get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo) +static char * +get_rel_infos_query(void *arg) { - PGconn *conn = connectToServer(cluster, - dbinfo->db_name); - PGresult *res; - RelInfo *relinfos; - int ntups; - int relnum; - int num_rels = 0; - char *nspname = NULL; - char *relname = NULL; - char *tablespace = NULL; - int i_spclocation, - i_nspname, - i_relname, - i_reloid, - i_indtable, - i_toastheap, - i_relfilenumber, - i_reltablespace; - char query[QUERY_ALLOC]; - char *last_namespace = NULL, - *last_tablespace = NULL; + PQExpBufferData query; - query[0] = '\0'; /* initialize query string to empty */ + initPQExpBuffer(&query); /* * Create a CTE that collects OIDs of regular user tables and matviews, @@ -480,34 +467,34 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo) * output, so we have to copy that system table. It's easiest to do that * by treating it as a user table. */ - snprintf(query + strlen(query), sizeof(query) - strlen(query), - "WITH regular_heap (reloid, indtable, toastheap) AS ( " - " SELECT c.oid, 0::oid, 0::oid " - " FROM pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n " - " ON c.relnamespace = n.oid " - " WHERE relkind IN (" CppAsString2(RELKIND_RELATION) ", " - CppAsString2(RELKIND_MATVIEW) ") AND " + appendPQExpBuffer(&query, + "WITH regular_heap (reloid, indtable, toastheap) AS ( " + " SELECT c.oid, 0::oid, 0::oid " + " FROM pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n " + " ON c.relnamespace = n.oid " + " WHERE relkind IN (" CppAsString2(RELKIND_RELATION) ", " + CppAsString2(RELKIND_MATVIEW) ") AND " /* exclude possible orphaned temp tables */ - " ((n.nspname !~ '^pg_temp_' AND " - " n.nspname !~ '^pg_toast_temp_' AND " - " n.nspname NOT IN ('pg_catalog', 'information_schema', " - " 'binary_upgrade', 'pg_toast') AND " - " c.oid >= %u::pg_catalog.oid) OR " - " (n.nspname = 'pg_catalog' AND " - " relname IN ('pg_largeobject') ))), ", - FirstNormalObjectId); + " ((n.nspname !~ '^pg_temp_' AND " + " n.nspname !~ '^pg_toast_temp_' AND " + " n.nspname NOT IN ('pg_catalog', 'information_schema', " + " 'binary_upgrade', 'pg_toast') AND " + " c.oid >= %u::pg_catalog.oid) OR " + " (n.nspname = 'pg_catalog' AND " + " relname IN ('pg_largeobject') ))), ", + FirstNormalObjectId); /* * Add a CTE that collects OIDs of toast tables belonging to the tables * selected by the regular_heap CTE. (We have to do this separately * because the namespace-name rules above don't work for toast tables.) */ - snprintf(query + strlen(query), sizeof(query) - strlen(query), - " toast_heap (reloid, indtable, toastheap) AS ( " - " SELECT c.reltoastrelid, 0::oid, c.oid " - " FROM regular_heap JOIN pg_catalog.pg_class c " - " ON regular_heap.reloid = c.oid " - " WHERE c.reltoastrelid != 0), "); + appendPQExpBufferStr(&query, + " toast_heap (reloid, indtable, toastheap) AS ( " + " SELECT c.reltoastrelid, 0::oid, c.oid " + " FROM regular_heap JOIN pg_catalog.pg_class c " + " ON regular_heap.reloid = c.oid " + " WHERE c.reltoastrelid != 0), "); /* * Add a CTE that collects OIDs of all valid indexes on the previously @@ -515,53 +502,61 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo) * Testing indisready is necessary in 9.2, and harmless in earlier/later * versions. */ - snprintf(query + strlen(query), sizeof(query) - strlen(query), - " all_index (reloid, indtable, toastheap) AS ( " - " SELECT indexrelid, indrelid, 0::oid " - " FROM pg_catalog.pg_index " - " WHERE indisvalid AND indisready " - " AND indrelid IN " - " (SELECT reloid FROM regular_heap " - " UNION ALL " - " SELECT reloid FROM toast_heap)) "); + appendPQExpBufferStr(&query, + " all_index (reloid, indtable, toastheap) AS ( " + " SELECT indexrelid, indrelid, 0::oid " + " FROM pg_catalog.pg_index " + " WHERE indisvalid AND indisready " + " AND indrelid IN " + " (SELECT reloid FROM regular_heap " + " UNION ALL " + " SELECT reloid FROM toast_heap)) "); /* * And now we can write the query that retrieves the data we want for each * heap and index relation. Make sure result is sorted by OID. */ - snprintf(query + strlen(query), sizeof(query) - strlen(query), - "SELECT all_rels.*, n.nspname, c.relname, " - " c.relfilenode, c.reltablespace, " - " pg_catalog.pg_tablespace_location(t.oid) AS spclocation " - "FROM (SELECT * FROM regular_heap " - " UNION ALL " - " SELECT * FROM toast_heap " - " UNION ALL " - " SELECT * FROM all_index) all_rels " - " JOIN pg_catalog.pg_class c " - " ON all_rels.reloid = c.oid " - " JOIN pg_catalog.pg_namespace n " - " ON c.relnamespace = n.oid " - " LEFT OUTER JOIN pg_catalog.pg_tablespace t " - " ON c.reltablespace = t.oid " - "ORDER BY 1;"); - - res = executeQueryOrDie(conn, "%s", query); - - ntups = PQntuples(res); - - relinfos = (RelInfo *) pg_malloc(sizeof(RelInfo) * ntups); + appendPQExpBufferStr(&query, + "SELECT all_rels.*, n.nspname, c.relname, " + " c.relfilenode, c.reltablespace, " + " pg_catalog.pg_tablespace_location(t.oid) AS spclocation " + "FROM (SELECT * FROM regular_heap " + " UNION ALL " + " SELECT * FROM toast_heap " + " UNION ALL " + " SELECT * FROM all_index) all_rels " + " JOIN pg_catalog.pg_class c " + " ON all_rels.reloid = c.oid " + " JOIN pg_catalog.pg_namespace n " + " ON c.relnamespace = n.oid " + " LEFT OUTER JOIN pg_catalog.pg_tablespace t " + " ON c.reltablespace = t.oid " + "ORDER BY 1;"); + + return query.data; +} - i_reloid = PQfnumber(res, "reloid"); - i_indtable = PQfnumber(res, "indtable"); - i_toastheap = PQfnumber(res, "toastheap"); - i_nspname = PQfnumber(res, "nspname"); - i_relname = PQfnumber(res, "relname"); - i_relfilenumber = PQfnumber(res, "relfilenode"); - i_reltablespace = PQfnumber(res, "reltablespace"); - i_spclocation = PQfnumber(res, "spclocation"); +static void +get_rel_infos_result(DbInfo *dbinfo, PGresult *res, void *arg) +{ + int ntups = PQntuples(res); + RelInfo *relinfos = (RelInfo *) pg_malloc(sizeof(RelInfo) * ntups); + int i_reloid = PQfnumber(res, "reloid"); + int i_indtable = PQfnumber(res, "indtable"); + int i_toastheap = PQfnumber(res, "toastheap"); + int i_nspname = PQfnumber(res, "nspname"); + int i_relname = PQfnumber(res, "relname"); + int i_relfilenumber = PQfnumber(res, "relfilenode"); + int i_reltablespace = PQfnumber(res, "reltablespace"); + int i_spclocation = PQfnumber(res, "spclocation"); + int num_rels = 0; + char *nspname = NULL; + char *relname = NULL; + char *tablespace = NULL; + char *last_namespace = NULL; + char *last_tablespace = NULL; - for (relnum = 0; relnum < ntups; relnum++) + for (int relnum = 0; relnum < ntups; relnum++) { RelInfo *curr = &relinfos[num_rels++]; @@ -614,9 +609,6 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo) /* A zero reltablespace oid indicates the database tablespace. */ curr->tablespace = dbinfo->db_tablespace; } - PQclear(res); - - PQfinish(conn); dbinfo->rel_arr.rels = relinfos; dbinfo->rel_arr.nrels = num_rels; @@ -638,20 +630,9 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo) * check_old_cluster_for_valid_slots() would raise a FATAL error if such slots * are included. */ -static void -get_old_cluster_logical_slot_infos(DbInfo *dbinfo) +static char * +get_old_cluster_logical_slot_infos_query(void *arg) { - PGconn *conn; - PGresult *res; - LogicalSlotInfo *slotinfos = NULL; - int num_slots; - - /* Logical slots can be migrated since PG17. */ - if (GET_MAJOR_VERSION(old_cluster.major_version) <= 1600) - return; - - conn = connectToServer(&old_cluster, dbinfo->db_name); - /* * Fetch the logical replication slot information. The check whether the * slot is considered caught up is done by an upgrade function. This @@ -669,18 +650,23 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo) * started and stopped several times causing any temporary slots to be * removed. */ - res = executeQueryOrDie(conn, "SELECT slot_name, plugin, two_phase, failover, " - "%s as caught_up, invalidation_reason IS NOT NULL as invalid " - "FROM pg_catalog.pg_replication_slots " - "WHERE slot_type = 'logical' AND " - "database = current_database() AND " - "temporary IS FALSE;", - user_opts.live_check ? "FALSE" : - "(CASE WHEN invalidation_reason IS NOT NULL THEN FALSE " - "ELSE (SELECT pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name)) " - "END)"); - - num_slots = PQntuples(res); + return psprintf("SELECT slot_name, plugin, two_phase, failover, " + "%s as caught_up, invalidation_reason IS NOT NULL as invalid " + "FROM pg_catalog.pg_replication_slots " + "WHERE slot_type = 'logical' AND " + "database = current_database() AND " + "temporary IS FALSE;", + user_opts.live_check ? "FALSE" : + "(CASE WHEN invalidation_reason IS NOT NULL THEN FALSE " + "ELSE (SELECT pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name)) " + "END)"); +} + +static void +get_old_cluster_logical_slot_infos_result(DbInfo *dbinfo, PGresult *res, void *arg) +{ + LogicalSlotInfo *slotinfos = NULL; + int num_slots = PQntuples(res); if (num_slots) { @@ -713,14 +699,10 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo) } } - PQclear(res); - PQfinish(conn); - dbinfo->slot_arr.slots = slotinfos; dbinfo->slot_arr.nslots = num_slots; } - /* * count_old_cluster_logical_slots() * -- 2.39.3 (Apple Git-146)
>From 1c822873b79171acb4ab50bf61d7a592b73dac09 Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nat...@postgresql.org> Date: Fri, 28 Jun 2024 21:24:35 -0500 Subject: [PATCH v5 06/13] use new pg_upgrade async API to parallelize getting loadable libraries --- src/bin/pg_upgrade/function.c | 63 ++++++++++++++++++++--------------- 1 file changed, 37 insertions(+), 26 deletions(-) diff --git a/src/bin/pg_upgrade/function.c b/src/bin/pg_upgrade/function.c index 7e3abed098..da3717e71c 100644 --- a/src/bin/pg_upgrade/function.c +++ b/src/bin/pg_upgrade/function.c @@ -42,6 +42,32 @@ library_name_compare(const void *p1, const void *p2) ((const LibraryInfo *) p2)->dbnum); } +struct loadable_libraries_state +{ + PGresult **ress; + int totaltups; +}; + +static char * +get_loadable_libraries_query(void *arg) +{ + return psprintf("SELECT DISTINCT probin " + "FROM pg_catalog.pg_proc " + "WHERE prolang = %u AND " + "probin IS NOT NULL AND " + "oid >= %u;", + ClanguageId, + FirstNormalObjectId); +} + +static void +get_loadable_libraries_result(DbInfo *dbinfo, PGresult *res, void *arg) +{ + struct loadable_libraries_state *state = (struct loadable_libraries_state *) arg; + + state->ress[dbinfo - old_cluster.dbarr.dbs] = res; + state->totaltups += PQntuples(res); +} /* * get_loadable_libraries() @@ -54,47 +80,32 @@ library_name_compare(const void *p1, const void *p2) void get_loadable_libraries(void) { - PGresult **ress; int totaltups; int dbnum; int n_libinfos; + AsyncTask *task = async_task_create(); + struct loadable_libraries_state state; - ress = (PGresult **) pg_malloc(old_cluster.dbarr.ndbs * sizeof(PGresult *)); - totaltups = 0; + state.ress = (PGresult **) pg_malloc(old_cluster.dbarr.ndbs * sizeof(PGresult *)); + state.totaltups = 0; - /* Fetch all library names, removing duplicates within each DB */ - for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++) - { - DbInfo *active_db = &old_cluster.dbarr.dbs[dbnum]; - PGconn *conn = connectToServer(&old_cluster, active_db->db_name); + async_task_add_step(task, get_loadable_libraries_query, + get_loadable_libraries_result, false, &state); - /* - * Fetch all libraries containing non-built-in C functions in this DB. - */ - ress[dbnum] = executeQueryOrDie(conn, - "SELECT DISTINCT probin " - "FROM pg_catalog.pg_proc " - "WHERE prolang = %u AND " - "probin IS NOT NULL AND " - "oid >= %u;", - ClanguageId, - FirstNormalObjectId); - totaltups += PQntuples(ress[dbnum]); - - PQfinish(conn); - } + async_task_run(task, &old_cluster); + async_task_free(task); /* * Allocate memory for required libraries and logical replication output * plugins. */ - n_libinfos = totaltups + count_old_cluster_logical_slots(); + n_libinfos = state.totaltups + count_old_cluster_logical_slots(); os_info.libraries = (LibraryInfo *) pg_malloc(sizeof(LibraryInfo) * n_libinfos); totaltups = 0; for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++) { - PGresult *res = ress[dbnum]; + PGresult *res = state.ress[dbnum]; int ntups; int rowno; LogicalSlotInfoArr *slot_arr = &old_cluster.dbarr.dbs[dbnum].slot_arr; @@ -129,7 +140,7 @@ get_loadable_libraries(void) } } - pg_free(ress); + pg_free(state.ress); os_info.num_libraries = totaltups; } -- 2.39.3 (Apple Git-146)
>From 274965e786fbaf369f01eb7ea24319b220580c7d Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nat...@postgresql.org> Date: Fri, 28 Jun 2024 21:31:57 -0500 Subject: [PATCH v5 07/13] use new pg_upgrade async API to parallelize reporting extension updates --- src/bin/pg_upgrade/version.c | 82 ++++++++++++++++++------------------ 1 file changed, 41 insertions(+), 41 deletions(-) diff --git a/src/bin/pg_upgrade/version.c b/src/bin/pg_upgrade/version.c index 2de6dffccd..bfcf08c936 100644 --- a/src/bin/pg_upgrade/version.c +++ b/src/bin/pg_upgrade/version.c @@ -139,6 +139,42 @@ old_9_6_invalidate_hash_indexes(ClusterInfo *cluster, bool check_mode) check_ok(); } +static char * +report_extension_updates_query(void *arg) +{ + return pg_strdup("SELECT name " + "FROM pg_available_extensions " + "WHERE installed_version != default_version"); +} + +static void +report_extension_updates_result(DbInfo *dbinfo, PGresult *res, void *arg) +{ + bool db_used = false; + int ntups = PQntuples(res); + int i_name = PQfnumber(res, "name"); + char *output_path = "update_extensions.sql"; + FILE **script = (FILE **) arg; + + for (int rowno = 0; rowno < ntups; rowno++) + { + if (*script == NULL && (*script = fopen_priv(output_path, "w")) == NULL) + pg_fatal("could not open file \"%s\": %m", output_path); + if (!db_used) + { + PQExpBufferData connectbuf; + + initPQExpBuffer(&connectbuf); + appendPsqlMetaConnect(&connectbuf, dbinfo->db_name); + fputs(connectbuf.data, *script); + termPQExpBuffer(&connectbuf); + db_used = true; + } + fprintf(*script, "ALTER EXTENSION %s UPDATE;\n", + quote_identifier(PQgetvalue(res, rowno, i_name))); + } +} + /* * report_extension_updates() * Report extensions that should be updated. @@ -146,53 +182,17 @@ old_9_6_invalidate_hash_indexes(ClusterInfo *cluster, bool check_mode) void report_extension_updates(ClusterInfo *cluster) { - int dbnum; FILE *script = NULL; char *output_path = "update_extensions.sql"; + AsyncTask *task = async_task_create(); prep_status("Checking for extension updates"); - for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++) - { - PGresult *res; - bool db_used = false; - int ntups; - int rowno; - int i_name; - DbInfo *active_db = &cluster->dbarr.dbs[dbnum]; - PGconn *conn = connectToServer(cluster, active_db->db_name); - - /* find extensions needing updates */ - res = executeQueryOrDie(conn, - "SELECT name " - "FROM pg_available_extensions " - "WHERE installed_version != default_version" - ); - - ntups = PQntuples(res); - i_name = PQfnumber(res, "name"); - for (rowno = 0; rowno < ntups; rowno++) - { - if (script == NULL && (script = fopen_priv(output_path, "w")) == NULL) - pg_fatal("could not open file \"%s\": %m", output_path); - if (!db_used) - { - PQExpBufferData connectbuf; - - initPQExpBuffer(&connectbuf); - appendPsqlMetaConnect(&connectbuf, active_db->db_name); - fputs(connectbuf.data, script); - termPQExpBuffer(&connectbuf); - db_used = true; - } - fprintf(script, "ALTER EXTENSION %s UPDATE;\n", - quote_identifier(PQgetvalue(res, rowno, i_name))); - } + async_task_add_step(task, report_extension_updates_query, + report_extension_updates_result, true, &script); - PQclear(res); - - PQfinish(conn); - } + async_task_run(task, cluster); + async_task_free(task); if (script) { -- 2.39.3 (Apple Git-146)
>From c546c0106c0ff79e8bb74bdd4a902ab53f33f4f5 Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nat...@postgresql.org> Date: Sat, 6 Jul 2024 21:06:31 -0500 Subject: [PATCH v5 08/13] parallelize data type checks in pg_upgrade --- src/bin/pg_upgrade/check.c | 319 +++++++++++++++++++------------------ 1 file changed, 160 insertions(+), 159 deletions(-) diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c index 41a23e7efe..9877c7012f 100644 --- a/src/bin/pg_upgrade/check.c +++ b/src/bin/pg_upgrade/check.c @@ -32,6 +32,10 @@ static void check_new_cluster_subscription_configuration(void); static void check_old_cluster_for_valid_slots(void); static void check_old_cluster_subscription_state(void); +static bool *data_type_check_results; +static bool data_type_check_failed; +static PQExpBufferData data_type_check_report; + /* * DataTypesUsageChecks - definitions of data type checks for the old cluster * in order to determine if an upgrade can be performed. See the comment on @@ -314,6 +318,129 @@ static DataTypesUsageChecks data_types_usage_checks[] = } }; +static char * +data_type_check_query(void *arg) +{ + int i = (int) (intptr_t) arg; + DataTypesUsageChecks *check = &data_types_usage_checks[i]; + + return psprintf("WITH RECURSIVE oids AS ( " + /* start with the type(s) returned by base_query */ + " %s " + " UNION ALL " + " SELECT * FROM ( " + /* inner WITH because we can only reference the CTE once */ + " WITH x AS (SELECT oid FROM oids) " + /* domains on any type selected so far */ + " SELECT t.oid FROM pg_catalog.pg_type t, x WHERE typbasetype = x.oid AND typtype = 'd' " + " UNION ALL " + /* arrays over any type selected so far */ + " SELECT t.oid FROM pg_catalog.pg_type t, x WHERE typelem = x.oid AND typtype = 'b' " + " UNION ALL " + /* composite types containing any type selected so far */ + " SELECT t.oid FROM pg_catalog.pg_type t, pg_catalog.pg_class c, pg_catalog.pg_attribute a, x " + " WHERE t.typtype = 'c' AND " + " t.oid = c.reltype AND " + " c.oid = a.attrelid AND " + " NOT a.attisdropped AND " + " a.atttypid = x.oid " + " UNION ALL " + /* ranges containing any type selected so far */ + " SELECT t.oid FROM pg_catalog.pg_type t, pg_catalog.pg_range r, x " + " WHERE t.typtype = 'r' AND r.rngtypid = t.oid AND r.rngsubtype = x.oid" + " ) foo " + ") " + /* now look for stored columns of any such type */ + "SELECT n.nspname, c.relname, a.attname " + "FROM pg_catalog.pg_class c, " + " pg_catalog.pg_namespace n, " + " pg_catalog.pg_attribute a " + "WHERE c.oid = a.attrelid AND " + " NOT a.attisdropped AND " + " a.atttypid IN (SELECT oid FROM oids) AND " + " c.relkind IN (" + CppAsString2(RELKIND_RELATION) ", " + CppAsString2(RELKIND_MATVIEW) ", " + CppAsString2(RELKIND_INDEX) ") AND " + " c.relnamespace = n.oid AND " + /* exclude possible orphaned temp tables */ + " n.nspname !~ '^pg_temp_' AND " + " n.nspname !~ '^pg_toast_temp_' AND " + /* exclude system catalogs, too */ + " n.nspname NOT IN ('pg_catalog', 'information_schema')", + check->base_query); +} + +static void +data_type_check_process(DbInfo *dbinfo, PGresult *res, void *arg) +{ + int i = (int) (intptr_t) arg; + DataTypesUsageChecks *check = &data_types_usage_checks[i]; + int ntups = PQntuples(res); + + if (ntups) + { + char output_path[MAXPGPATH]; + int i_nspname; + int i_relname; + int i_attname; + FILE *script = NULL; + bool db_used = false; + + snprintf(output_path, sizeof(output_path), "%s/%s", + log_opts.basedir, + check->report_filename); + + /* + * Make sure we have a buffer to save reports to now that we found a + * first failing check. + */ + if (!data_type_check_failed) + initPQExpBuffer(&data_type_check_report); + data_type_check_failed = true; + + /* + * If this is the first time we see an error for the check in question + * then print a status message of the failure. + */ + if (!data_type_check_results[i]) + { + pg_log(PG_REPORT, " failed check: %s", _(check->status)); + appendPQExpBuffer(&data_type_check_report, "\n%s\n%s %s\n", + _(check->report_text), + _("A list of the problem columns is in the file:"), + output_path); + } + data_type_check_results[i] = true; + + i_nspname = PQfnumber(res, "nspname"); + i_relname = PQfnumber(res, "relname"); + i_attname = PQfnumber(res, "attname"); + + for (int rowno = 0; rowno < ntups; rowno++) + { + if (script == NULL && (script = fopen_priv(output_path, "a")) == NULL) + pg_fatal("could not open file \"%s\": %m", output_path); + + if (!db_used) + { + fprintf(script, "In database: %s\n", dbinfo->db_name); + db_used = true; + } + fprintf(script, " %s.%s.%s\n", + PQgetvalue(res, rowno, i_nspname), + PQgetvalue(res, rowno, i_relname), + PQgetvalue(res, rowno, i_attname)); + } + + if (script) + { + fclose(script); + script = NULL; + } + } +} + /* * check_for_data_types_usage() * Detect whether there are any stored columns depending on given type(s) @@ -337,10 +464,9 @@ static void check_for_data_types_usage(ClusterInfo *cluster, DataTypesUsageChecks *checks) { bool found = false; - bool *results; - PQExpBufferData report; DataTypesUsageChecks *tmp = checks; int n_data_types_usage_checks = 0; + AsyncTask *task = async_task_create(); prep_status("Checking for data type usage"); @@ -352,176 +478,51 @@ check_for_data_types_usage(ClusterInfo *cluster, DataTypesUsageChecks *checks) } /* Prepare an array to store the results of checks in */ - results = pg_malloc0(sizeof(bool) * n_data_types_usage_checks); + data_type_check_results = pg_malloc0(sizeof(bool) * n_data_types_usage_checks); + data_type_check_failed = false; - /* - * Connect to each database in the cluster and run all defined checks - * against that database before trying the next one. - */ - for (int dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++) + for (int i = 0; i < n_data_types_usage_checks; i++) { - DbInfo *active_db = &cluster->dbarr.dbs[dbnum]; - PGconn *conn = connectToServer(cluster, active_db->db_name); + DataTypesUsageChecks *check = &checks[i]; - for (int checknum = 0; checknum < n_data_types_usage_checks; checknum++) + if (check->threshold_version == MANUAL_CHECK) { - PGresult *res; - int ntups; - int i_nspname; - int i_relname; - int i_attname; - FILE *script = NULL; - bool db_used = false; - char output_path[MAXPGPATH]; - DataTypesUsageChecks *cur_check = &checks[checknum]; - - if (cur_check->threshold_version == MANUAL_CHECK) - { - Assert(cur_check->version_hook); - - /* - * Make sure that the check applies to the current cluster - * version and skip if not. If no check hook has been defined - * we run the check for all versions. - */ - if (!cur_check->version_hook(cluster)) - continue; - } - else if (cur_check->threshold_version != ALL_VERSIONS) - { - if (GET_MAJOR_VERSION(cluster->major_version) > cur_check->threshold_version) - continue; - } - else - Assert(cur_check->threshold_version == ALL_VERSIONS); - - snprintf(output_path, sizeof(output_path), "%s/%s", - log_opts.basedir, - cur_check->report_filename); - - /* - * The type(s) of interest might be wrapped in a domain, array, - * composite, or range, and these container types can be nested - * (to varying extents depending on server version, but that's not - * of concern here). To handle all these cases we need a - * recursive CTE. - */ - res = executeQueryOrDie(conn, - "WITH RECURSIVE oids AS ( " - /* start with the type(s) returned by base_query */ - " %s " - " UNION ALL " - " SELECT * FROM ( " - /* inner WITH because we can only reference the CTE once */ - " WITH x AS (SELECT oid FROM oids) " - /* domains on any type selected so far */ - " SELECT t.oid FROM pg_catalog.pg_type t, x WHERE typbasetype = x.oid AND typtype = 'd' " - " UNION ALL " - /* arrays over any type selected so far */ - " SELECT t.oid FROM pg_catalog.pg_type t, x WHERE typelem = x.oid AND typtype = 'b' " - " UNION ALL " - /* composite types containing any type selected so far */ - " SELECT t.oid FROM pg_catalog.pg_type t, pg_catalog.pg_class c, pg_catalog.pg_attribute a, x " - " WHERE t.typtype = 'c' AND " - " t.oid = c.reltype AND " - " c.oid = a.attrelid AND " - " NOT a.attisdropped AND " - " a.atttypid = x.oid " - " UNION ALL " - /* ranges containing any type selected so far */ - " SELECT t.oid FROM pg_catalog.pg_type t, pg_catalog.pg_range r, x " - " WHERE t.typtype = 'r' AND r.rngtypid = t.oid AND r.rngsubtype = x.oid" - " ) foo " - ") " - /* now look for stored columns of any such type */ - "SELECT n.nspname, c.relname, a.attname " - "FROM pg_catalog.pg_class c, " - " pg_catalog.pg_namespace n, " - " pg_catalog.pg_attribute a " - "WHERE c.oid = a.attrelid AND " - " NOT a.attisdropped AND " - " a.atttypid IN (SELECT oid FROM oids) AND " - " c.relkind IN (" - CppAsString2(RELKIND_RELATION) ", " - CppAsString2(RELKIND_MATVIEW) ", " - CppAsString2(RELKIND_INDEX) ") AND " - " c.relnamespace = n.oid AND " - /* exclude possible orphaned temp tables */ - " n.nspname !~ '^pg_temp_' AND " - " n.nspname !~ '^pg_toast_temp_' AND " - /* exclude system catalogs, too */ - " n.nspname NOT IN ('pg_catalog', 'information_schema')", - cur_check->base_query); - - ntups = PQntuples(res); + Assert(check->version_hook); /* - * The datatype was found, so extract the data and log to the - * requested filename. We need to open the file for appending - * since the check might have already found the type in another - * database earlier in the loop. + * Make sure that the check applies to the current cluster version + * and skip it if not. */ - if (ntups) - { - /* - * Make sure we have a buffer to save reports to now that we - * found a first failing check. - */ - if (!found) - initPQExpBuffer(&report); - found = true; - - /* - * If this is the first time we see an error for the check in - * question then print a status message of the failure. - */ - if (!results[checknum]) - { - pg_log(PG_REPORT, " failed check: %s", _(cur_check->status)); - appendPQExpBuffer(&report, "\n%s\n%s %s\n", - _(cur_check->report_text), - _("A list of the problem columns is in the file:"), - output_path); - } - results[checknum] = true; - - i_nspname = PQfnumber(res, "nspname"); - i_relname = PQfnumber(res, "relname"); - i_attname = PQfnumber(res, "attname"); - - for (int rowno = 0; rowno < ntups; rowno++) - { - if (script == NULL && (script = fopen_priv(output_path, "a")) == NULL) - pg_fatal("could not open file \"%s\": %m", output_path); - - if (!db_used) - { - fprintf(script, "In database: %s\n", active_db->db_name); - db_used = true; - } - fprintf(script, " %s.%s.%s\n", - PQgetvalue(res, rowno, i_nspname), - PQgetvalue(res, rowno, i_relname), - PQgetvalue(res, rowno, i_attname)); - } - - if (script) - { - fclose(script); - script = NULL; - } - } - - PQclear(res); + if (!check->version_hook(cluster)) + continue; } + else if (check->threshold_version != ALL_VERSIONS) + { + if (GET_MAJOR_VERSION(cluster->major_version) > check->threshold_version) + continue; + } + else + Assert(check->threshold_version == ALL_VERSIONS); - PQfinish(conn); + async_task_add_step(task, data_type_check_query, + data_type_check_process, true, + (void *) (intptr_t) i); } + /* + * Connect to each database in the cluster and run all defined checks + * against that database before trying the next one. + */ + async_task_run(task, cluster); + async_task_free(task); + if (found) - pg_fatal("Data type checks failed: %s", report.data); + { + pg_fatal("Data type checks failed: %s", data_type_check_report.data); + termPQExpBuffer(&data_type_check_report); + } - pg_free(results); + pg_free(data_type_check_results); check_ok(); } -- 2.39.3 (Apple Git-146)
>From b882aa11fb05c4985934e45496e7c374c8bf2461 Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nat...@postgresql.org> Date: Mon, 8 Jul 2024 21:00:20 -0500 Subject: [PATCH v5 09/13] parallelize isn and int8 passing mismatch check in pg_upgrade --- src/bin/pg_upgrade/check.c | 85 ++++++++++++++++++++------------------ 1 file changed, 44 insertions(+), 41 deletions(-) diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c index 9877c7012f..505ebd1caa 100644 --- a/src/bin/pg_upgrade/check.c +++ b/src/bin/pg_upgrade/check.c @@ -1193,6 +1193,44 @@ check_for_prepared_transactions(ClusterInfo *cluster) check_ok(); } +static char * +isn_and_int8_passing_mismatch_query(void *arg) +{ + return pg_strdup("SELECT n.nspname, p.proname " + "FROM pg_catalog.pg_proc p, " + " pg_catalog.pg_namespace n " + "WHERE p.pronamespace = n.oid AND " + " p.probin = '$libdir/isn'"); +} + +static void +isn_and_int8_passing_mismatch_process(DbInfo *dbinfo, PGresult *res, void *arg) +{ + bool db_used = false; + int ntups = PQntuples(res); + int i_nspname = PQfnumber(res, "nspname"); + int i_proname = PQfnumber(res, "proname"); + FILE **script = (FILE **) arg; + char output_path[MAXPGPATH]; + + snprintf(output_path, sizeof(output_path), "%s/%s", + log_opts.basedir, + "contrib_isn_and_int8_pass_by_value.txt"); + + for (int rowno = 0; rowno < ntups; rowno++) + { + if (*script == NULL && (*script = fopen_priv(output_path, "w")) == NULL) + pg_fatal("could not open file \"%s\": %m", output_path); + if (!db_used) + { + fprintf(*script, "In database: %s\n", dbinfo->db_name); + db_used = true; + } + fprintf(*script, " %s.%s\n", + PQgetvalue(res, rowno, i_nspname), + PQgetvalue(res, rowno, i_proname)); + } +} /* * check_for_isn_and_int8_passing_mismatch() @@ -1204,8 +1242,8 @@ check_for_prepared_transactions(ClusterInfo *cluster) static void check_for_isn_and_int8_passing_mismatch(ClusterInfo *cluster) { - int dbnum; FILE *script = NULL; + AsyncTask *task; char output_path[MAXPGPATH]; prep_status("Checking for contrib/isn with bigint-passing mismatch"); @@ -1222,46 +1260,11 @@ check_for_isn_and_int8_passing_mismatch(ClusterInfo *cluster) log_opts.basedir, "contrib_isn_and_int8_pass_by_value.txt"); - for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++) - { - PGresult *res; - bool db_used = false; - int ntups; - int rowno; - int i_nspname, - i_proname; - DbInfo *active_db = &cluster->dbarr.dbs[dbnum]; - PGconn *conn = connectToServer(cluster, active_db->db_name); - - /* Find any functions coming from contrib/isn */ - res = executeQueryOrDie(conn, - "SELECT n.nspname, p.proname " - "FROM pg_catalog.pg_proc p, " - " pg_catalog.pg_namespace n " - "WHERE p.pronamespace = n.oid AND " - " p.probin = '$libdir/isn'"); - - ntups = PQntuples(res); - i_nspname = PQfnumber(res, "nspname"); - i_proname = PQfnumber(res, "proname"); - for (rowno = 0; rowno < ntups; rowno++) - { - if (script == NULL && (script = fopen_priv(output_path, "w")) == NULL) - pg_fatal("could not open file \"%s\": %m", output_path); - if (!db_used) - { - fprintf(script, "In database: %s\n", active_db->db_name); - db_used = true; - } - fprintf(script, " %s.%s\n", - PQgetvalue(res, rowno, i_nspname), - PQgetvalue(res, rowno, i_proname)); - } - - PQclear(res); - - PQfinish(conn); - } + task = async_task_create(); + async_task_add_step(task, isn_and_int8_passing_mismatch_query, + isn_and_int8_passing_mismatch_process, true, &script); + async_task_run(task, cluster); + async_task_free(task); if (script) { -- 2.39.3 (Apple Git-146)
>From 79c3f7b4303b6e7a857b02f2194c06cb92d42301 Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nat...@postgresql.org> Date: Mon, 8 Jul 2024 21:12:49 -0500 Subject: [PATCH v5 10/13] parallelize user defined postfix ops check in pg_upgrade --- src/bin/pg_upgrade/check.c | 134 +++++++++++++++++++------------------ 1 file changed, 69 insertions(+), 65 deletions(-) diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c index 505ebd1caa..664039b441 100644 --- a/src/bin/pg_upgrade/check.c +++ b/src/bin/pg_upgrade/check.c @@ -1282,15 +1282,79 @@ check_for_isn_and_int8_passing_mismatch(ClusterInfo *cluster) check_ok(); } +static char * +user_defined_postfix_ops_query(void *arg) +{ + /* + * The query below hardcodes FirstNormalObjectId as 16384 rather than + * interpolating that C #define into the query because, if that #define is + * ever changed, the cutoff we want to use is the value used by + * pre-version 14 servers, not that of some future version. + */ + return pg_strdup("SELECT o.oid AS oproid, " + " n.nspname AS oprnsp, " + " o.oprname, " + " tn.nspname AS typnsp, " + " t.typname " + "FROM pg_catalog.pg_operator o, " + " pg_catalog.pg_namespace n, " + " pg_catalog.pg_type t, " + " pg_catalog.pg_namespace tn " + "WHERE o.oprnamespace = n.oid AND " + " o.oprleft = t.oid AND " + " t.typnamespace = tn.oid AND " + " o.oprright = 0 AND " + " o.oid >= 16384"); +} + +static void +user_defined_postfix_ops_process(DbInfo *dbinfo, PGresult *res, void *arg) +{ + FILE **script = (FILE **) arg; + char output_path[MAXPGPATH]; + int ntups = PQntuples(res); + bool db_used = false; + int i_oproid = PQfnumber(res, "oproid"); + int i_oprnsp = PQfnumber(res, "oprnsp"); + int i_oprname = PQfnumber(res, "oprname"); + int i_typnsp = PQfnumber(res, "typnsp"); + int i_typname = PQfnumber(res, "typname"); + + if (!ntups) + return; + + snprintf(output_path, sizeof(output_path), "%s/%s", + log_opts.basedir, + "postfix_ops.txt"); + + for (int rowno = 0; rowno < ntups; rowno++) + { + if (*script == NULL && + (*script = fopen_priv(output_path, "w")) == NULL) + pg_fatal("could not open file \"%s\": %m", output_path); + if (!db_used) + { + fprintf(*script, "In database: %s\n", dbinfo->db_name); + db_used = true; + } + fprintf(*script, " (oid=%s) %s.%s (%s.%s, NONE)\n", + PQgetvalue(res, rowno, i_oproid), + PQgetvalue(res, rowno, i_oprnsp), + PQgetvalue(res, rowno, i_oprname), + PQgetvalue(res, rowno, i_typnsp), + PQgetvalue(res, rowno, i_typname)); + } +} + /* * Verify that no user defined postfix operators exist. */ static void check_for_user_defined_postfix_ops(ClusterInfo *cluster) { - int dbnum; FILE *script = NULL; char output_path[MAXPGPATH]; + AsyncTask *task = async_task_create(); prep_status("Checking for user-defined postfix operators"); @@ -1298,70 +1362,10 @@ check_for_user_defined_postfix_ops(ClusterInfo *cluster) log_opts.basedir, "postfix_ops.txt"); - /* Find any user defined postfix operators */ - for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++) - { - PGresult *res; - bool db_used = false; - int ntups; - int rowno; - int i_oproid, - i_oprnsp, - i_oprname, - i_typnsp, - i_typname; - DbInfo *active_db = &cluster->dbarr.dbs[dbnum]; - PGconn *conn = connectToServer(cluster, active_db->db_name); - - /* - * The query below hardcodes FirstNormalObjectId as 16384 rather than - * interpolating that C #define into the query because, if that - * #define is ever changed, the cutoff we want to use is the value - * used by pre-version 14 servers, not that of some future version. - */ - res = executeQueryOrDie(conn, - "SELECT o.oid AS oproid, " - " n.nspname AS oprnsp, " - " o.oprname, " - " tn.nspname AS typnsp, " - " t.typname " - "FROM pg_catalog.pg_operator o, " - " pg_catalog.pg_namespace n, " - " pg_catalog.pg_type t, " - " pg_catalog.pg_namespace tn " - "WHERE o.oprnamespace = n.oid AND " - " o.oprleft = t.oid AND " - " t.typnamespace = tn.oid AND " - " o.oprright = 0 AND " - " o.oid >= 16384"); - ntups = PQntuples(res); - i_oproid = PQfnumber(res, "oproid"); - i_oprnsp = PQfnumber(res, "oprnsp"); - i_oprname = PQfnumber(res, "oprname"); - i_typnsp = PQfnumber(res, "typnsp"); - i_typname = PQfnumber(res, "typname"); - for (rowno = 0; rowno < ntups; rowno++) - { - if (script == NULL && - (script = fopen_priv(output_path, "w")) == NULL) - pg_fatal("could not open file \"%s\": %m", output_path); - if (!db_used) - { - fprintf(script, "In database: %s\n", active_db->db_name); - db_used = true; - } - fprintf(script, " (oid=%s) %s.%s (%s.%s, NONE)\n", - PQgetvalue(res, rowno, i_oproid), - PQgetvalue(res, rowno, i_oprnsp), - PQgetvalue(res, rowno, i_oprname), - PQgetvalue(res, rowno, i_typnsp), - PQgetvalue(res, rowno, i_typname)); - } - - PQclear(res); - - PQfinish(conn); - } + async_task_add_step(task, user_defined_postfix_ops_query, + user_defined_postfix_ops_process, true, &script); + async_task_run(task, cluster); + async_task_free(task); if (script) { -- 2.39.3 (Apple Git-146)
>From 3ee389a9ffebf05fb691beb16b0e0a5f94d02f9b Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nat...@postgresql.org> Date: Mon, 8 Jul 2024 21:30:19 -0500 Subject: [PATCH v5 11/13] parallelize incompatible polymorphics check in pg_upgrade --- src/bin/pg_upgrade/check.c | 174 ++++++++++++++++--------------- src/tools/pgindent/typedefs.list | 1 + 2 files changed, 92 insertions(+), 83 deletions(-) diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c index 664039b441..614464dfd3 100644 --- a/src/bin/pg_upgrade/check.c +++ b/src/bin/pg_upgrade/check.c @@ -1381,6 +1381,81 @@ check_for_user_defined_postfix_ops(ClusterInfo *cluster) check_ok(); } +typedef struct incompat_polymorphics_state +{ + FILE *script; + PQExpBufferData old_polymorphics; + char output_path[MAXPGPATH]; +} incompat_polymorphics_state; + +static char * +incompat_polymorphics_query(void *arg) +{ + incompat_polymorphics_state *state = (incompat_polymorphics_state *) arg; + + /* + * The query below hardcodes FirstNormalObjectId as 16384 rather than + * interpolating that C #define into the query because, if that #define is + * ever changed, the cutoff we want to use is the value used by + * pre-version 14 servers, not that of some future version. + */ + /* Aggregate transition functions */ + return psprintf("SELECT 'aggregate' AS objkind, p.oid::regprocedure::text AS objname " + "FROM pg_proc AS p " + "JOIN pg_aggregate AS a ON a.aggfnoid=p.oid " + "JOIN pg_proc AS transfn ON transfn.oid=a.aggtransfn " + "WHERE p.oid >= 16384 " + "AND a.aggtransfn = ANY(ARRAY[%s]::regprocedure[]) " + "AND a.aggtranstype = ANY(ARRAY['anyarray', 'anyelement']::regtype[]) " + + /* Aggregate final functions */ + "UNION ALL " + "SELECT 'aggregate' AS objkind, p.oid::regprocedure::text AS objname " + "FROM pg_proc AS p " + "JOIN pg_aggregate AS a ON a.aggfnoid=p.oid " + "JOIN pg_proc AS finalfn ON finalfn.oid=a.aggfinalfn " + "WHERE p.oid >= 16384 " + "AND a.aggfinalfn = ANY(ARRAY[%s]::regprocedure[]) " + "AND a.aggtranstype = ANY(ARRAY['anyarray', 'anyelement']::regtype[]) " + + /* Operators */ + "UNION ALL " + "SELECT 'operator' AS objkind, op.oid::regoperator::text AS objname " + "FROM pg_operator AS op " + "WHERE op.oid >= 16384 " + "AND oprcode = ANY(ARRAY[%s]::regprocedure[]) " + "AND oprleft = ANY(ARRAY['anyarray', 'anyelement']::regtype[]);", + state->old_polymorphics.data, + state->old_polymorphics.data, + state->old_polymorphics.data); +} + +static void +incompat_polymorphics_process(DbInfo *dbinfo, PGresult *res, void *arg) +{ + incompat_polymorphics_state *state = (incompat_polymorphics_state *) arg; + bool db_used = false; + int ntups = PQntuples(res); + int i_objkind = PQfnumber(res, "objkind"); + int i_objname = PQfnumber(res, "objname"); + + for (int rowno = 0; rowno < ntups; rowno++) + { + if (state->script == NULL && + (state->script = fopen_priv(state->output_path, "w")) == NULL) + pg_fatal("could not open file \"%s\": %m", state->output_path); + if (!db_used) + { + fprintf(state->script, "In database: %s\n", dbinfo->db_name); + db_used = true; + } + + fprintf(state->script, " %s: %s\n", + PQgetvalue(res, rowno, i_objkind), + PQgetvalue(res, rowno, i_objname)); + } +} + /* * check_for_incompatible_polymorphics() * @@ -1390,111 +1465,44 @@ check_for_user_defined_postfix_ops(ClusterInfo *cluster) static void check_for_incompatible_polymorphics(ClusterInfo *cluster) { - PGresult *res; - FILE *script = NULL; - char output_path[MAXPGPATH]; - PQExpBufferData old_polymorphics; + AsyncTask *task = async_task_create(); + incompat_polymorphics_state state; prep_status("Checking for incompatible polymorphic functions"); - snprintf(output_path, sizeof(output_path), "%s/%s", + state.script = NULL; + snprintf(state.output_path, sizeof(state.output_path), "%s/%s", log_opts.basedir, "incompatible_polymorphics.txt"); /* The set of problematic functions varies a bit in different versions */ - initPQExpBuffer(&old_polymorphics); + initPQExpBuffer(&state.old_polymorphics); - appendPQExpBufferStr(&old_polymorphics, + appendPQExpBufferStr(&state.old_polymorphics, "'array_append(anyarray,anyelement)'" ", 'array_cat(anyarray,anyarray)'" ", 'array_prepend(anyelement,anyarray)'"); if (GET_MAJOR_VERSION(cluster->major_version) >= 903) - appendPQExpBufferStr(&old_polymorphics, + appendPQExpBufferStr(&state.old_polymorphics, ", 'array_remove(anyarray,anyelement)'" ", 'array_replace(anyarray,anyelement,anyelement)'"); if (GET_MAJOR_VERSION(cluster->major_version) >= 905) - appendPQExpBufferStr(&old_polymorphics, + appendPQExpBufferStr(&state.old_polymorphics, ", 'array_position(anyarray,anyelement)'" ", 'array_position(anyarray,anyelement,integer)'" ", 'array_positions(anyarray,anyelement)'" ", 'width_bucket(anyelement,anyarray)'"); - for (int dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++) - { - bool db_used = false; - DbInfo *active_db = &cluster->dbarr.dbs[dbnum]; - PGconn *conn = connectToServer(cluster, active_db->db_name); - int ntups; - int i_objkind, - i_objname; - - /* - * The query below hardcodes FirstNormalObjectId as 16384 rather than - * interpolating that C #define into the query because, if that - * #define is ever changed, the cutoff we want to use is the value - * used by pre-version 14 servers, not that of some future version. - */ - res = executeQueryOrDie(conn, - /* Aggregate transition functions */ - "SELECT 'aggregate' AS objkind, p.oid::regprocedure::text AS objname " - "FROM pg_proc AS p " - "JOIN pg_aggregate AS a ON a.aggfnoid=p.oid " - "JOIN pg_proc AS transfn ON transfn.oid=a.aggtransfn " - "WHERE p.oid >= 16384 " - "AND a.aggtransfn = ANY(ARRAY[%s]::regprocedure[]) " - "AND a.aggtranstype = ANY(ARRAY['anyarray', 'anyelement']::regtype[]) " - - /* Aggregate final functions */ - "UNION ALL " - "SELECT 'aggregate' AS objkind, p.oid::regprocedure::text AS objname " - "FROM pg_proc AS p " - "JOIN pg_aggregate AS a ON a.aggfnoid=p.oid " - "JOIN pg_proc AS finalfn ON finalfn.oid=a.aggfinalfn " - "WHERE p.oid >= 16384 " - "AND a.aggfinalfn = ANY(ARRAY[%s]::regprocedure[]) " - "AND a.aggtranstype = ANY(ARRAY['anyarray', 'anyelement']::regtype[]) " - - /* Operators */ - "UNION ALL " - "SELECT 'operator' AS objkind, op.oid::regoperator::text AS objname " - "FROM pg_operator AS op " - "WHERE op.oid >= 16384 " - "AND oprcode = ANY(ARRAY[%s]::regprocedure[]) " - "AND oprleft = ANY(ARRAY['anyarray', 'anyelement']::regtype[]);", - old_polymorphics.data, - old_polymorphics.data, - old_polymorphics.data); - - ntups = PQntuples(res); - - i_objkind = PQfnumber(res, "objkind"); - i_objname = PQfnumber(res, "objname"); - - for (int rowno = 0; rowno < ntups; rowno++) - { - if (script == NULL && - (script = fopen_priv(output_path, "w")) == NULL) - pg_fatal("could not open file \"%s\": %m", output_path); - if (!db_used) - { - fprintf(script, "In database: %s\n", active_db->db_name); - db_used = true; - } - - fprintf(script, " %s: %s\n", - PQgetvalue(res, rowno, i_objkind), - PQgetvalue(res, rowno, i_objname)); - } - - PQclear(res); - PQfinish(conn); - } + async_task_add_step(task, incompat_polymorphics_query, + incompat_polymorphics_process, true, &state); + async_task_run(task, cluster); + async_task_free(task); - if (script) + if (state.script) { - fclose(script); + fclose(state.script); pg_log(PG_REPORT, "fatal"); pg_fatal("Your installation contains user-defined objects that refer to internal\n" "polymorphic functions with arguments of type \"anyarray\" or \"anyelement\".\n" @@ -1502,12 +1510,12 @@ check_for_incompatible_polymorphics(ClusterInfo *cluster) "afterwards, changing them to refer to the new corresponding functions with\n" "arguments of type \"anycompatiblearray\" and \"anycompatible\".\n" "A list of the problematic objects is in the file:\n" - " %s", output_path); + " %s", state.output_path); } else check_ok(); - termPQExpBuffer(&old_polymorphics); + termPQExpBuffer(&state.old_polymorphics); } /* diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index a2330f4346..66db3d3e2c 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -3536,6 +3536,7 @@ hstoreUpgrade_t hyperLogLogState ifState import_error_callback_arg +incompat_polymorphics_state indexed_tlist inet inetKEY -- 2.39.3 (Apple Git-146)
>From 20d8f0905226a2e2d05deff83054f7ba7561d5f1 Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nat...@postgresql.org> Date: Mon, 8 Jul 2024 21:42:22 -0500 Subject: [PATCH v5 12/13] parallelize tables with oids check in pg_upgrade --- src/bin/pg_upgrade/check.c | 90 ++++++++++++++++++++------------------ 1 file changed, 48 insertions(+), 42 deletions(-) diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c index 614464dfd3..a09eafa16e 100644 --- a/src/bin/pg_upgrade/check.c +++ b/src/bin/pg_upgrade/check.c @@ -1518,15 +1518,58 @@ check_for_incompatible_polymorphics(ClusterInfo *cluster) termPQExpBuffer(&state.old_polymorphics); } +static char * +with_oids_query(void *arg) +{ + return pg_strdup("SELECT n.nspname, c.relname " + "FROM pg_catalog.pg_class c, " + " pg_catalog.pg_namespace n " + "WHERE c.relnamespace = n.oid AND " + " c.relhasoids AND" + " n.nspname NOT IN ('pg_catalog')"); +} + +static void +with_oids_process(DbInfo *dbinfo, PGresult *res, void *arg) +{ + bool db_used = false; + int ntups = PQntuples(res); + char output_path[MAXPGPATH]; + int i_nspname = PQfnumber(res, "nspname"); + int i_relname = PQfnumber(res, "relname"); + FILE **script = (FILE **) arg; + + if (!ntups) + return; + + snprintf(output_path, sizeof(output_path), "%s/%s", + log_opts.basedir, + "tables_with_oids.txt"); + + for (int rowno = 0; rowno < ntups; rowno++) + { + if (*script == NULL && (*script = fopen_priv(output_path, "w")) == NULL) + pg_fatal("could not open file \"%s\": %m", output_path); + if (!db_used) + { + fprintf(*script, "In database: %s\n", dbinfo->db_name); + db_used = true; + } + fprintf(*script, " %s.%s\n", + PQgetvalue(res, rowno, i_nspname), + PQgetvalue(res, rowno, i_relname)); + } +} + /* * Verify that no tables are declared WITH OIDS. */ static void check_for_tables_with_oids(ClusterInfo *cluster) { - int dbnum; FILE *script = NULL; char output_path[MAXPGPATH]; + AsyncTask *task = async_task_create(); prep_status("Checking for tables WITH OIDS"); @@ -1534,47 +1577,10 @@ check_for_tables_with_oids(ClusterInfo *cluster) log_opts.basedir, "tables_with_oids.txt"); - /* Find any tables declared WITH OIDS */ - for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++) - { - PGresult *res; - bool db_used = false; - int ntups; - int rowno; - int i_nspname, - i_relname; - DbInfo *active_db = &cluster->dbarr.dbs[dbnum]; - PGconn *conn = connectToServer(cluster, active_db->db_name); - - res = executeQueryOrDie(conn, - "SELECT n.nspname, c.relname " - "FROM pg_catalog.pg_class c, " - " pg_catalog.pg_namespace n " - "WHERE c.relnamespace = n.oid AND " - " c.relhasoids AND" - " n.nspname NOT IN ('pg_catalog')"); - - ntups = PQntuples(res); - i_nspname = PQfnumber(res, "nspname"); - i_relname = PQfnumber(res, "relname"); - for (rowno = 0; rowno < ntups; rowno++) - { - if (script == NULL && (script = fopen_priv(output_path, "w")) == NULL) - pg_fatal("could not open file \"%s\": %m", output_path); - if (!db_used) - { - fprintf(script, "In database: %s\n", active_db->db_name); - db_used = true; - } - fprintf(script, " %s.%s\n", - PQgetvalue(res, rowno, i_nspname), - PQgetvalue(res, rowno, i_relname)); - } - - PQclear(res); - - PQfinish(conn); - } + async_task_add_step(task, with_oids_query, + with_oids_process, true, &script); + async_task_run(task, cluster); + async_task_free(task); if (script) { -- 2.39.3 (Apple Git-146)
>From 86486c2c8e1862ad616967efa7aa025aa60c9283 Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nat...@postgresql.org> Date: Mon, 8 Jul 2024 21:52:13 -0500 Subject: [PATCH v5 13/13] parallelize user defined encoding conversions check in pg_upgrade --- src/bin/pg_upgrade/check.c | 107 ++++++++++++++++++++----------------- 1 file changed, 57 insertions(+), 50 deletions(-) diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c index a09eafa16e..5ef8169488 100644 --- a/src/bin/pg_upgrade/check.c +++ b/src/bin/pg_upgrade/check.c @@ -1654,15 +1654,66 @@ check_for_pg_role_prefix(ClusterInfo *cluster) check_ok(); } +static char * +user_defined_encoding_conversions_query(void *arg) +{ + /* + * The query below hardcodes FirstNormalObjectId as 16384 rather than + * interpolating that C #define into the query because, if that #define is + * ever changed, the cutoff we want to use is the value used by + * pre-version 14 servers, not that of some future version. + */ + return pstrdup("SELECT c.oid as conoid, c.conname, n.nspname " + "FROM pg_catalog.pg_conversion c, " + " pg_catalog.pg_namespace n " + "WHERE c.connamespace = n.oid AND " + " c.oid >= 16384"); +} + +static void +user_defined_encoding_conversions_process(DbInfo *dbinfo, PGresult *res, void *arg) +{ + FILE **script = (FILE **) arg; + bool db_used = false; + int ntups = PQntuples(res); + char output_path[MAXPGPATH]; + int i_conoid = PQfnumber(res, "conoid"); + int i_conname = PQfnumber(res, "conname"); + int i_nspname = PQfnumber(res, "nspname"); + + if (!ntups) + return; + + snprintf(output_path, sizeof(output_path), "%s/%s", + log_opts.basedir, + "encoding_conversions.txt"); + + for (int rowno = 0; rowno < ntups; rowno++) + { + if (*script == NULL && + (*script = fopen_priv(output_path, "w")) == NULL) + pg_fatal("could not open file \"%s\": %m", output_path); + if (!db_used) + { + fprintf(*script, "In database: %s\n", dbinfo->db_name); + db_used = true; + } + fprintf(*script, " (oid=%s) %s.%s\n", + PQgetvalue(res, rowno, i_conoid), + PQgetvalue(res, rowno, i_nspname), + PQgetvalue(res, rowno, i_conname)); + } +} + /* * Verify that no user-defined encoding conversions exist. */ static void check_for_user_defined_encoding_conversions(ClusterInfo *cluster) { - int dbnum; FILE *script = NULL; char output_path[MAXPGPATH]; + AsyncTask *task = async_task_create(); prep_status("Checking for user-defined encoding conversions"); @@ -1670,55 +1721,11 @@ check_for_user_defined_encoding_conversions(ClusterInfo *cluster) log_opts.basedir, "encoding_conversions.txt"); - /* Find any user defined encoding conversions */ - for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++) - { - PGresult *res; - bool db_used = false; - int ntups; - int rowno; - int i_conoid, - i_conname, - i_nspname; - DbInfo *active_db = &cluster->dbarr.dbs[dbnum]; - PGconn *conn = connectToServer(cluster, active_db->db_name); - - /* - * The query below hardcodes FirstNormalObjectId as 16384 rather than - * interpolating that C #define into the query because, if that - * #define is ever changed, the cutoff we want to use is the value - * used by pre-version 14 servers, not that of some future version. - */ - res = executeQueryOrDie(conn, - "SELECT c.oid as conoid, c.conname, n.nspname " - "FROM pg_catalog.pg_conversion c, " - " pg_catalog.pg_namespace n " - "WHERE c.connamespace = n.oid AND " - " c.oid >= 16384"); - ntups = PQntuples(res); - i_conoid = PQfnumber(res, "conoid"); - i_conname = PQfnumber(res, "conname"); - i_nspname = PQfnumber(res, "nspname"); - for (rowno = 0; rowno < ntups; rowno++) - { - if (script == NULL && - (script = fopen_priv(output_path, "w")) == NULL) - pg_fatal("could not open file \"%s\": %m", output_path); - if (!db_used) - { - fprintf(script, "In database: %s\n", active_db->db_name); - db_used = true; - } - fprintf(script, " (oid=%s) %s.%s\n", - PQgetvalue(res, rowno, i_conoid), - PQgetvalue(res, rowno, i_nspname), - PQgetvalue(res, rowno, i_conname)); - } - - PQclear(res); - - PQfinish(conn); - } + async_task_add_step(task, user_defined_encoding_conversions_query, + user_defined_encoding_conversions_process, true, + &script); + async_task_run(task, cluster); + async_task_free(task); if (script) { -- 2.39.3 (Apple Git-146)