I figured I'd post what I have so far since this thread hasn't been updated in a while. The attached patches are still "proof-of-concept grade," but they are at least moving in the right direction (IMHO). The variable naming is still not great, and they are woefully undercommented, among other things.
0001 introduces a new API for registering callbacks and running them in parallel on all databases in the cluster. This new system manages a set of "slots" that follow a simple state machine to asynchronously establish a connection and run the queries. It uses system() to wait for these asynchronous tasks to complete. Users of this API only need to provide two callbacks: one to return the query that should be run on each database and another to process the results of that query. If multiple queries are required for each database, users can provide multiple sets of callbacks. The other patches change several of the existing tasks to use this new API. With these patches applied, I see the following differences in the output of 'pg_upgrade | ts -i' for a cluster with 1k empty databases: WITHOUT PATCH 00:00:19 Checking database user is the install user ok 00:00:02 Checking for subscription state ok 00:00:06 Adding ".old" suffix to old global/pg_control ok 00:00:04 Checking for extension updates ok WITH PATCHES (--jobs 1) 00:00:10 Checking database user is the install user ok 00:00:02 Checking for subscription state ok 00:00:07 Adding ".old" suffix to old global/pg_control ok 00:00:05 Checking for extension updates ok WITH PATCHES (--jobs 4) 00:00:06 Checking database user is the install user ok 00:00:00 Checking for subscription state ok 00:00:02 Adding ".old" suffix to old global/pg_control ok 00:00:01 Checking for extension updates ok Note that the "Checking database user is the install user" time also includes the call to get_db_rel_and_slot_infos() on the old cluster as well as the call to get_loadable_libraries() on the old cluster. I believe the improvement with the patches with just one job is due to the consolidation of the queries into one database connection (presently, get_db_rel_and_slot_infos() creates 3 connections per database for some upgrades). Similarly, the "Adding \".old\" suffix to old global/pg_control" time includes the call to get_db_rel_and_slot_infos() on the new cluster. There are several remaining places where we could use this new API to speed up upgrades. For example, I haven't attempted to use it for the data type checks yet, and that tends to eat up a sizable chunk of time when there are many databases. On Thu, May 16, 2024 at 08:24:08PM -0500, Nathan Bossart wrote: > On Thu, May 16, 2024 at 05:09:55PM -0700, Jeff Davis wrote: >> Also, did you consider connecting once to each database and running >> many queries? Most of those seem like just checks. > > This was the idea behind 347758b. It may be possible to do more along > these lines. IMO parallelizing will still be useful even if we do combine > more of the steps. My current thinking is that any possible further consolidation should happen as part of a follow-up effort to parallelization. I'm cautiously optimistic that the parallelization work will make the consolidation easier since it moves things to rigidly-defined callback functions. A separate piece of off-list feedback from Michael Paquier is that this new parallel system might be something we can teach the ParallelSlot code used by bin/scripts/ to do. I've yet to look too deeply into this, but I suspect that it will be difficult to combine the two. For example, the ParallelSlot system doesn't seem well-suited for the kind of run-once-in-each-database tasks required by pg_upgrade, and the error handling is probably little different, too. However, it's still worth a closer look, and I'm interested in folks' opinions on the subject. -- nathan
>From d7683a095d4d2c1574005eb41504a5be256d6480 Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nat...@postgresql.org> Date: Fri, 28 Jun 2024 11:02:44 -0500 Subject: [PATCH v2 1/6] introduce framework for parallelizing pg_upgrade tasks --- src/bin/pg_upgrade/Makefile | 1 + src/bin/pg_upgrade/async.c | 323 +++++++++++++++++++++++++++++++ src/bin/pg_upgrade/meson.build | 1 + src/bin/pg_upgrade/pg_upgrade.h | 16 ++ src/tools/pgindent/typedefs.list | 4 + 5 files changed, 345 insertions(+) create mode 100644 src/bin/pg_upgrade/async.c diff --git a/src/bin/pg_upgrade/Makefile b/src/bin/pg_upgrade/Makefile index bde91e2beb..3bc4f5d740 100644 --- a/src/bin/pg_upgrade/Makefile +++ b/src/bin/pg_upgrade/Makefile @@ -12,6 +12,7 @@ include $(top_builddir)/src/Makefile.global OBJS = \ $(WIN32RES) \ + async.o \ check.o \ controldata.o \ dump.o \ diff --git a/src/bin/pg_upgrade/async.c b/src/bin/pg_upgrade/async.c new file mode 100644 index 0000000000..7df1e7712d --- /dev/null +++ b/src/bin/pg_upgrade/async.c @@ -0,0 +1,323 @@ +/* + * async.c + * + * parallelization via libpq's async APIs + * + * Copyright (c) 2024, PostgreSQL Global Development Group + * src/bin/pg_upgrade/async.c + */ + +#include "postgres_fe.h" + +#include "common/connect.h" +#include "fe_utils/string_utils.h" +#include "pg_upgrade.h" + +static int dbs_complete; +static int dbs_processing; + +typedef struct AsyncTaskCallbacks +{ + AsyncTaskGetQueryCB query_cb; + AsyncTaskProcessCB process_cb; + bool free_result; + void *arg; +} AsyncTaskCallbacks; + +typedef struct AsyncTask +{ + AsyncTaskCallbacks *cbs; + int num_cb_sets; +} AsyncTask; + +typedef enum +{ + FREE, + CONNECTING, + SETTING_SEARCH_PATH, + RUNNING_QUERY, +} AsyncSlotState; + +typedef struct +{ + AsyncSlotState state; + int db; + int query; + PGconn *conn; +} AsyncSlot; + +AsyncTask * +async_task_create(void) +{ + return pg_malloc0(sizeof(AsyncTask)); +} + +void +async_task_free(AsyncTask *task) +{ + if (task->cbs) + pg_free(task->cbs); + + pg_free(task); +} + +void +async_task_add_step(AsyncTask *task, + AsyncTaskGetQueryCB query_cb, + AsyncTaskProcessCB process_cb, bool free_result, + void *arg) +{ + AsyncTaskCallbacks *new_cbs; + + task->cbs = pg_realloc(task->cbs, + ++task->num_cb_sets * sizeof(AsyncTaskCallbacks)); + + new_cbs = &task->cbs[task->num_cb_sets - 1]; + new_cbs->query_cb = query_cb; + new_cbs->process_cb = process_cb; + new_cbs->free_result = free_result; + new_cbs->arg = arg; +} + +static void +conn_failure(PGconn *conn) +{ + pg_log(PG_REPORT, "%s", PQerrorMessage(conn)); + printf(_("Failure, exiting\n")); + exit(1); +} + +static void +start_conn(const ClusterInfo *cluster, AsyncSlot *slot) +{ + PQExpBufferData conn_opts; + + /* Build connection string with proper quoting */ + initPQExpBuffer(&conn_opts); + appendPQExpBufferStr(&conn_opts, "dbname="); + appendConnStrVal(&conn_opts, cluster->dbarr.dbs[slot->db].db_name); + appendPQExpBufferStr(&conn_opts, " user="); + appendConnStrVal(&conn_opts, os_info.user); + appendPQExpBuffer(&conn_opts, " port=%d", cluster->port); + if (cluster->sockdir) + { + appendPQExpBufferStr(&conn_opts, " host="); + appendConnStrVal(&conn_opts, cluster->sockdir); + } + + slot->conn = PQconnectStart(conn_opts.data); + termPQExpBuffer(&conn_opts); + + if (!slot->conn) + conn_failure(slot->conn); +} + +static void +dispatch_query(const ClusterInfo *cluster, AsyncSlot *slot, + const AsyncTask *task) +{ + AsyncTaskCallbacks *cbs = &task->cbs[slot->query]; + AsyncTaskGetQueryCB get_query = cbs->query_cb; + DbInfo *dbinfo = &cluster->dbarr.dbs[slot->db]; + char *query = (*get_query) (dbinfo, cbs->arg); + + if (!PQsendQuery(slot->conn, query)) + conn_failure(slot->conn); + + pg_free(query); +} + +static PGresult * +get_last_result(PGconn *conn) +{ + PGresult *tmp; + PGresult *res = NULL; + + while ((tmp = PQgetResult(conn)) != NULL) + { + PQclear(res); + res = tmp; + if (PQstatus(conn) == CONNECTION_BAD) + conn_failure(conn); + } + + if (PQresultStatus(res) != PGRES_COMMAND_OK && + PQresultStatus(res) != PGRES_TUPLES_OK) + conn_failure(conn); + + return res; +} + +static void +process_query_result(const ClusterInfo *cluster, AsyncSlot *slot, + const AsyncTask *task) +{ + AsyncTaskCallbacks *cbs = &task->cbs[slot->query]; + AsyncTaskProcessCB process_cb = cbs->process_cb; + DbInfo *dbinfo = &cluster->dbarr.dbs[slot->db]; + PGresult *res = get_last_result(slot->conn); + + (*process_cb) (dbinfo, res, cbs->arg); + + if (cbs->free_result) + PQclear(res); +} + +static void +process_slot(const ClusterInfo *cluster, AsyncSlot *slot, const AsyncTask *task) +{ + switch (slot->state) + { + case FREE: + if (dbs_processing >= cluster->dbarr.ndbs) + return; + slot->db = dbs_processing++; + slot->state = CONNECTING; + start_conn(cluster, slot); + return; + + case CONNECTING: + if (PQconnectPoll(slot->conn) == PGRES_POLLING_FAILED) + conn_failure(slot->conn); + if (PQconnectPoll(slot->conn) != PGRES_POLLING_OK) + return; + slot->state = SETTING_SEARCH_PATH; + if (!PQsendQuery(slot->conn, ALWAYS_SECURE_SEARCH_PATH_SQL)) + conn_failure(slot->conn); + return; + + case SETTING_SEARCH_PATH: + if (!PQconsumeInput(slot->conn)) + conn_failure(slot->conn); + if (PQisBusy(slot->conn)) + return; + PQclear(get_last_result(slot->conn)); + slot->state = RUNNING_QUERY; + dispatch_query(cluster, slot, task); + return; + + case RUNNING_QUERY: + if (!PQconsumeInput(slot->conn)) + conn_failure(slot->conn); + if (PQisBusy(slot->conn)) + return; + process_query_result(cluster, slot, task); + if (++slot->query >= task->num_cb_sets) + { + dbs_complete++; + PQfinish(slot->conn); + memset(slot, 0, sizeof(AsyncSlot)); + return; + } + dispatch_query(cluster, slot, task); + return; + } +} + +/* + * Wait on the slots to either finish connecting or to receive query results if + * possible. This avoids a tight loop in async_task_run(). + */ +static void +wait_on_slots(AsyncSlot *slots, int numslots) +{ + fd_set input_mask; + fd_set output_mask; + fd_set except_mask; + int maxFd = 0; + + FD_ZERO(&input_mask); + FD_ZERO(&output_mask); + FD_ZERO(&except_mask); + + for (int i = 0; i < numslots; i++) + { + int sock; + bool read = false; + + switch (slots[i].state) + { + case FREE: + + /* + * If we see a free slot, return right away so that it can be + * reused immediately for the next database. This might cause + * us to spin more than necessary as we finish processing the + * last few databases, but that shouldn't cause too much harm. + */ + return; + + case CONNECTING: + + /* + * If we are waiting for the connection to establish, choose + * whether to wait for reading or for writing on the socket as + * appropriate. If neither apply, just return immediately so + * that we can handle the slot. + */ + { + PostgresPollingStatusType status; + + status = PQconnectPoll(slots[i].conn); + if (status == PGRES_POLLING_READING) + read = true; + else if (status != PGRES_POLLING_WRITING) + return; + } + break; + + case SETTING_SEARCH_PATH: + case RUNNING_QUERY: + + /* + * If we've sent a query, we must wait for the socket to be + * read-ready. Note that process_slot() handles calling + * PQconsumeInput() as required. + */ + read = true; + break; + } + + /* + * If there's some problem retrieving the socket, just pretend this + * slot doesn't exist. We don't expect this to happen regularly in + * practice, so it seems unlikely to cause too much harm. + */ + sock = PQsocket(slots[i].conn); + if (sock < 0) + continue; + + /* + * Add the socket to the set. + */ + FD_SET(sock, read ? &input_mask : &output_mask); + FD_SET(sock, &except_mask); + maxFd = Max(maxFd, sock); + } + + /* + * If we found socket(s) to wait on, wait. + */ + if (maxFd != 0) + (void) select(maxFd + 1, &input_mask, &output_mask, &except_mask, NULL); +} + +void +async_task_run(const AsyncTask *task, const ClusterInfo *cluster) +{ + int jobs = Max(1, user_opts.jobs); + AsyncSlot *slots = pg_malloc0(sizeof(AsyncSlot) * jobs); + + dbs_complete = 0; + dbs_processing = 0; + + while (dbs_complete < cluster->dbarr.ndbs) + { + for (int i = 0; i < jobs; i++) + process_slot(cluster, &slots[i], task); + + wait_on_slots(slots, jobs); + } + + pg_free(slots); +} diff --git a/src/bin/pg_upgrade/meson.build b/src/bin/pg_upgrade/meson.build index 9825fa3305..9eb48e176c 100644 --- a/src/bin/pg_upgrade/meson.build +++ b/src/bin/pg_upgrade/meson.build @@ -1,6 +1,7 @@ # Copyright (c) 2022-2024, PostgreSQL Global Development Group pg_upgrade_sources = files( + 'async.c', 'check.c', 'controldata.c', 'dump.c', diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h index 8afe240bdf..1ebad3bd74 100644 --- a/src/bin/pg_upgrade/pg_upgrade.h +++ b/src/bin/pg_upgrade/pg_upgrade.h @@ -494,3 +494,19 @@ void parallel_transfer_all_new_dbs(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr char *old_pgdata, char *new_pgdata, char *old_tablespace); bool reap_child(bool wait_for_child); + +/* async.c */ + +typedef char *(*AsyncTaskGetQueryCB) (DbInfo *dbinfo, void *arg); +typedef void (*AsyncTaskProcessCB) (DbInfo *dbinfo, PGresult *res, void *arg); + +/* struct definition is private to async.c */ +typedef struct AsyncTask AsyncTask; + +AsyncTask *async_task_create(void); +void async_task_add_step(AsyncTask *task, + AsyncTaskGetQueryCB query_cb, + AsyncTaskProcessCB process_cb, bool free_result, + void *arg); +void async_task_run(const AsyncTask *task, const ClusterInfo *cluster); +void async_task_free(AsyncTask *task); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index e6c1caf649..3d219cbfe2 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -153,6 +153,10 @@ ArrayMetaState ArraySubWorkspace ArrayToken ArrayType +AsyncSlot +AsyncSlotState +AsyncTask +AsyncTaskCallbacks AsyncQueueControl AsyncQueueEntry AsyncRequest -- 2.39.3 (Apple Git-146)
>From c84b3c97cb0befff8027702f1674e809f174b3aa Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nat...@postgresql.org> Date: Fri, 28 Jun 2024 17:21:19 -0500 Subject: [PATCH v2 2/6] use new pg_upgrade async API for subscription state checks --- src/bin/pg_upgrade/check.c | 200 ++++++++++++++++++++----------------- 1 file changed, 106 insertions(+), 94 deletions(-) diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c index 27924159d6..f653fa25a5 100644 --- a/src/bin/pg_upgrade/check.c +++ b/src/bin/pg_upgrade/check.c @@ -1906,6 +1906,75 @@ check_old_cluster_for_valid_slots(bool live_check) check_ok(); } +/* private state for subscription state checks */ +struct substate_info +{ + FILE *script; + char output_path[MAXPGPATH]; +}; + +/* + * We don't allow upgrade if there is a risk of dangling slot or origin + * corresponding to initial sync after upgrade. + * + * A slot/origin not created yet refers to the 'i' (initialize) state, while + * 'r' (ready) state refers to a slot/origin created previously but already + * dropped. These states are supported for pg_upgrade. The other states listed + * below are not supported: + * + * a) SUBREL_STATE_DATASYNC: A relation upgraded while in this state would + * retain a replication slot, which could not be dropped by the sync worker + * spawned after the upgrade because the subscription ID used for the slot name + * won't match anymore. + * + * b) SUBREL_STATE_SYNCDONE: A relation upgraded while in this state would + * retain the replication origin when there is a failure in tablesync worker + * immediately after dropping the replication slot in the publisher. + * + * c) SUBREL_STATE_FINISHEDCOPY: A tablesync worker spawned to work on a + * relation upgraded while in this state would expect an origin ID with the OID + * of the subscription used before the upgrade, causing it to fail. + * + * d) SUBREL_STATE_SYNCWAIT, SUBREL_STATE_CATCHUP and SUBREL_STATE_UNKNOWN: + * These states are not stored in the catalog, so we need not allow these + * states. + */ +static char * +sub_query(DbInfo *dbinfo, void *arg) +{ + return pg_strdup("SELECT r.srsubstate, s.subname, n.nspname, c.relname " + "FROM pg_catalog.pg_subscription_rel r " + "LEFT JOIN pg_catalog.pg_subscription s" + " ON r.srsubid = s.oid " + "LEFT JOIN pg_catalog.pg_class c" + " ON r.srrelid = c.oid " + "LEFT JOIN pg_catalog.pg_namespace n" + " ON c.relnamespace = n.oid " + "WHERE r.srsubstate NOT IN ('i', 'r') " + "ORDER BY s.subname"); +} + +static void +sub_process(DbInfo *dbinfo, PGresult *res, void *arg) +{ + struct substate_info *state = (struct substate_info *) arg; + int ntup = PQntuples(res); + + for (int i = 0; i < ntup; i++) + { + if (state->script == NULL && + (state->script = fopen_priv(state->output_path, "w")) == NULL) + pg_fatal("could not open file \"%s\": %m", state->output_path); + + fprintf(state->script, "The table sync state \"%s\" is not allowed for database:\"%s\" subscription:\"%s\" schema:\"%s\" relation:\"%s\"\n", + PQgetvalue(res, i, 0), + dbinfo->db_name, + PQgetvalue(res, i, 1), + PQgetvalue(res, i, 2), + PQgetvalue(res, i, 3)); + } +} + /* * check_old_cluster_subscription_state() * @@ -1916,115 +1985,58 @@ check_old_cluster_for_valid_slots(bool live_check) static void check_old_cluster_subscription_state(void) { - FILE *script = NULL; - char output_path[MAXPGPATH]; + AsyncTask *task = async_task_create(); + struct substate_info state; + PGresult *res; + PGconn *conn; int ntup; prep_status("Checking for subscription state"); - snprintf(output_path, sizeof(output_path), "%s/%s", + state.script = NULL; + snprintf(state.output_path, sizeof(state.output_path), "%s/%s", log_opts.basedir, "subs_invalid.txt"); - for (int dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++) - { - PGresult *res; - DbInfo *active_db = &old_cluster.dbarr.dbs[dbnum]; - PGconn *conn = connectToServer(&old_cluster, active_db->db_name); - /* We need to check for pg_replication_origin only once. */ - if (dbnum == 0) - { - /* - * Check that all the subscriptions have their respective - * replication origin. - */ - res = executeQueryOrDie(conn, - "SELECT d.datname, s.subname " - "FROM pg_catalog.pg_subscription s " - "LEFT OUTER JOIN pg_catalog.pg_replication_origin o " - " ON o.roname = 'pg_' || s.oid " - "INNER JOIN pg_catalog.pg_database d " - " ON d.oid = s.subdbid " - "WHERE o.roname IS NULL;"); - - ntup = PQntuples(res); - for (int i = 0; i < ntup; i++) - { - if (script == NULL && (script = fopen_priv(output_path, "w")) == NULL) - pg_fatal("could not open file \"%s\": %m", output_path); - fprintf(script, "The replication origin is missing for database:\"%s\" subscription:\"%s\"\n", - PQgetvalue(res, i, 0), - PQgetvalue(res, i, 1)); - } - PQclear(res); - } - - /* - * We don't allow upgrade if there is a risk of dangling slot or - * origin corresponding to initial sync after upgrade. - * - * A slot/origin not created yet refers to the 'i' (initialize) state, - * while 'r' (ready) state refers to a slot/origin created previously - * but already dropped. These states are supported for pg_upgrade. The - * other states listed below are not supported: - * - * a) SUBREL_STATE_DATASYNC: A relation upgraded while in this state - * would retain a replication slot, which could not be dropped by the - * sync worker spawned after the upgrade because the subscription ID - * used for the slot name won't match anymore. - * - * b) SUBREL_STATE_SYNCDONE: A relation upgraded while in this state - * would retain the replication origin when there is a failure in - * tablesync worker immediately after dropping the replication slot in - * the publisher. - * - * c) SUBREL_STATE_FINISHEDCOPY: A tablesync worker spawned to work on - * a relation upgraded while in this state would expect an origin ID - * with the OID of the subscription used before the upgrade, causing - * it to fail. - * - * d) SUBREL_STATE_SYNCWAIT, SUBREL_STATE_CATCHUP and - * SUBREL_STATE_UNKNOWN: These states are not stored in the catalog, - * so we need not allow these states. - */ - res = executeQueryOrDie(conn, - "SELECT r.srsubstate, s.subname, n.nspname, c.relname " - "FROM pg_catalog.pg_subscription_rel r " - "LEFT JOIN pg_catalog.pg_subscription s" - " ON r.srsubid = s.oid " - "LEFT JOIN pg_catalog.pg_class c" - " ON r.srrelid = c.oid " - "LEFT JOIN pg_catalog.pg_namespace n" - " ON c.relnamespace = n.oid " - "WHERE r.srsubstate NOT IN ('i', 'r') " - "ORDER BY s.subname"); - - ntup = PQntuples(res); - for (int i = 0; i < ntup; i++) - { - if (script == NULL && (script = fopen_priv(output_path, "w")) == NULL) - pg_fatal("could not open file \"%s\": %m", output_path); + /* + * Check that all the subscriptions have their respective replication + * origin. This check only needs to run once. + */ + conn = connectToServer(&old_cluster, old_cluster.dbarr.dbs[0].db_name); + res = executeQueryOrDie(conn, + "SELECT d.datname, s.subname " + "FROM pg_catalog.pg_subscription s " + "LEFT OUTER JOIN pg_catalog.pg_replication_origin o " + " ON o.roname = 'pg_' || s.oid " + "INNER JOIN pg_catalog.pg_database d " + " ON d.oid = s.subdbid " + "WHERE o.roname IS NULL;"); + ntup = PQntuples(res); + for (int i = 0; i < ntup; i++) + { + if (state.script == NULL && + (state.script = fopen_priv(state.output_path, "w")) == NULL) + pg_fatal("could not open file \"%s\": %m", state.output_path); + fprintf(state.script, "The replication origin is missing for database:\"%s\" subscription:\"%s\"\n", + PQgetvalue(res, i, 0), + PQgetvalue(res, i, 1)); + } + PQclear(res); + PQfinish(conn); - fprintf(script, "The table sync state \"%s\" is not allowed for database:\"%s\" subscription:\"%s\" schema:\"%s\" relation:\"%s\"\n", - PQgetvalue(res, i, 0), - active_db->db_name, - PQgetvalue(res, i, 1), - PQgetvalue(res, i, 2), - PQgetvalue(res, i, 3)); - } + async_task_add_step(task, sub_query, sub_process, true, &state); - PQclear(res); - PQfinish(conn); - } + async_task_run(task, &old_cluster); + async_task_free(task); - if (script) + if (state.script) { - fclose(script); + fclose(state.script); pg_log(PG_REPORT, "fatal"); pg_fatal("Your installation contains subscriptions without origin or having relations not in i (initialize) or r (ready) state.\n" "You can allow the initial sync to finish for all relations and then restart the upgrade.\n" "A list of the problematic subscriptions is in the file:\n" - " %s", output_path); + " %s", state.output_path); } else check_ok(); -- 2.39.3 (Apple Git-146)
>From c9d2c483ac1fabc0897c19a60a1cf6054e1293da Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nat...@postgresql.org> Date: Tue, 21 May 2024 16:35:19 -0500 Subject: [PATCH v2 3/6] move live_check variable to user_opts --- src/bin/pg_upgrade/check.c | 32 ++++++++++++++++---------------- src/bin/pg_upgrade/controldata.c | 5 +++-- src/bin/pg_upgrade/info.c | 12 +++++------- src/bin/pg_upgrade/option.c | 4 ++-- src/bin/pg_upgrade/pg_upgrade.c | 21 ++++++++++----------- src/bin/pg_upgrade/pg_upgrade.h | 13 +++++++------ 6 files changed, 43 insertions(+), 44 deletions(-) diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c index f653fa25a5..251f3d9017 100644 --- a/src/bin/pg_upgrade/check.c +++ b/src/bin/pg_upgrade/check.c @@ -29,7 +29,7 @@ static void check_for_new_tablespace_dir(void); static void check_for_user_defined_encoding_conversions(ClusterInfo *cluster); static void check_new_cluster_logical_replication_slots(void); static void check_new_cluster_subscription_configuration(void); -static void check_old_cluster_for_valid_slots(bool live_check); +static void check_old_cluster_for_valid_slots(void); static void check_old_cluster_subscription_state(void); /* @@ -555,9 +555,9 @@ fix_path_separator(char *path) } void -output_check_banner(bool live_check) +output_check_banner(void) { - if (user_opts.check && live_check) + if (user_opts.live_check) { pg_log(PG_REPORT, "Performing Consistency Checks on Old Live Server\n" @@ -573,18 +573,18 @@ output_check_banner(bool live_check) void -check_and_dump_old_cluster(bool live_check) +check_and_dump_old_cluster(void) { /* -- OLD -- */ - if (!live_check) + if (!user_opts.live_check) start_postmaster(&old_cluster, true); /* * Extract a list of databases, tables, and logical replication slots from * the old cluster. */ - get_db_rel_and_slot_infos(&old_cluster, live_check); + get_db_rel_and_slot_infos(&old_cluster); init_tablespaces(); @@ -605,7 +605,7 @@ check_and_dump_old_cluster(bool live_check) * Logical replication slots can be migrated since PG17. See comments * atop get_old_cluster_logical_slot_infos(). */ - check_old_cluster_for_valid_slots(live_check); + check_old_cluster_for_valid_slots(); /* * Subscriptions and their dependencies can be migrated since PG17. @@ -652,7 +652,7 @@ check_and_dump_old_cluster(bool live_check) */ if (GET_MAJOR_VERSION(old_cluster.major_version) <= 906) { - if (user_opts.check) + if (user_opts.live_check) old_9_6_invalidate_hash_indexes(&old_cluster, true); } @@ -667,7 +667,7 @@ check_and_dump_old_cluster(bool live_check) if (!user_opts.check) generate_old_dump(); - if (!live_check) + if (!user_opts.live_check) stop_postmaster(false); } @@ -675,7 +675,7 @@ check_and_dump_old_cluster(bool live_check) void check_new_cluster(void) { - get_db_rel_and_slot_infos(&new_cluster, false); + get_db_rel_and_slot_infos(&new_cluster); check_new_cluster_is_empty(); @@ -826,14 +826,14 @@ check_cluster_versions(void) void -check_cluster_compatibility(bool live_check) +check_cluster_compatibility(void) { /* get/check pg_control data of servers */ - get_control_data(&old_cluster, live_check); - get_control_data(&new_cluster, false); + get_control_data(&old_cluster); + get_control_data(&new_cluster); check_control_data(&old_cluster.controldata, &new_cluster.controldata); - if (live_check && old_cluster.port == new_cluster.port) + if (user_opts.live_check && old_cluster.port == new_cluster.port) pg_fatal("When checking a live server, " "the old and new port numbers must be different."); } @@ -1839,7 +1839,7 @@ check_new_cluster_subscription_configuration(void) * before shutdown. */ static void -check_old_cluster_for_valid_slots(bool live_check) +check_old_cluster_for_valid_slots(void) { char output_path[MAXPGPATH]; FILE *script = NULL; @@ -1878,7 +1878,7 @@ check_old_cluster_for_valid_slots(bool live_check) * Note: This can be satisfied only when the old cluster has been * shut down, so we skip this for live checks. */ - if (!live_check && !slot->caught_up) + if (!user_opts.live_check && !slot->caught_up) { if (script == NULL && (script = fopen_priv(output_path, "w")) == NULL) diff --git a/src/bin/pg_upgrade/controldata.c b/src/bin/pg_upgrade/controldata.c index 1f0ccea3ed..cf665b9dee 100644 --- a/src/bin/pg_upgrade/controldata.c +++ b/src/bin/pg_upgrade/controldata.c @@ -33,7 +33,7 @@ * return valid xid data for a running server. */ void -get_control_data(ClusterInfo *cluster, bool live_check) +get_control_data(ClusterInfo *cluster) { char cmd[MAXPGPATH]; char bufin[MAX_STRING]; @@ -76,6 +76,7 @@ get_control_data(ClusterInfo *cluster, bool live_check) uint32 segno = 0; char *resetwal_bin; int rc; + bool live_check = (cluster == &old_cluster && user_opts.live_check); /* * Because we test the pg_resetwal output as strings, it has to be in @@ -118,7 +119,7 @@ get_control_data(ClusterInfo *cluster, bool live_check) /* * Check for clean shutdown */ - if (!live_check || cluster == &new_cluster) + if (!live_check) { /* only pg_controldata outputs the cluster state */ snprintf(cmd, sizeof(cmd), "\"%s/pg_controldata\" \"%s\"", diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c index 95c22a7200..8f1777de59 100644 --- a/src/bin/pg_upgrade/info.c +++ b/src/bin/pg_upgrade/info.c @@ -27,7 +27,7 @@ static void free_rel_infos(RelInfoArr *rel_arr); static void print_db_infos(DbInfoArr *db_arr); static void print_rel_infos(RelInfoArr *rel_arr); static void print_slot_infos(LogicalSlotInfoArr *slot_arr); -static void get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check); +static void get_old_cluster_logical_slot_infos(DbInfo *dbinfo); static void get_db_subscription_count(DbInfo *dbinfo); @@ -273,11 +273,9 @@ report_unmatched_relation(const RelInfo *rel, const DbInfo *db, bool is_new_db) * * higher level routine to generate dbinfos for the database running * on the given "port". Assumes that server is already running. - * - * live_check would be used only when the target is the old cluster. */ void -get_db_rel_and_slot_infos(ClusterInfo *cluster, bool live_check) +get_db_rel_and_slot_infos(ClusterInfo *cluster) { int dbnum; @@ -299,7 +297,7 @@ get_db_rel_and_slot_infos(ClusterInfo *cluster, bool live_check) */ if (cluster == &old_cluster) { - get_old_cluster_logical_slot_infos(pDbInfo, live_check); + get_old_cluster_logical_slot_infos(pDbInfo); get_db_subscription_count(pDbInfo); } } @@ -645,7 +643,7 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo) * are included. */ static void -get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check) +get_old_cluster_logical_slot_infos(DbInfo *dbinfo) { PGconn *conn; PGresult *res; @@ -681,7 +679,7 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check) "WHERE slot_type = 'logical' AND " "database = current_database() AND " "temporary IS FALSE;", - live_check ? "FALSE" : + user_opts.live_check ? "FALSE" : "(CASE WHEN invalidation_reason IS NOT NULL THEN FALSE " "ELSE (SELECT pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name)) " "END)"); diff --git a/src/bin/pg_upgrade/option.c b/src/bin/pg_upgrade/option.c index 548ea4e623..6f41d63eed 100644 --- a/src/bin/pg_upgrade/option.c +++ b/src/bin/pg_upgrade/option.c @@ -470,10 +470,10 @@ adjust_data_dir(ClusterInfo *cluster) * directory. */ void -get_sock_dir(ClusterInfo *cluster, bool live_check) +get_sock_dir(ClusterInfo *cluster) { #if !defined(WIN32) - if (!live_check) + if (!user_opts.live_check || cluster == &new_cluster) cluster->sockdir = user_opts.socketdir; else { diff --git a/src/bin/pg_upgrade/pg_upgrade.c b/src/bin/pg_upgrade/pg_upgrade.c index af370768b6..3f4ad7d5cc 100644 --- a/src/bin/pg_upgrade/pg_upgrade.c +++ b/src/bin/pg_upgrade/pg_upgrade.c @@ -65,7 +65,7 @@ static void create_new_objects(void); static void copy_xact_xlog_xid(void); static void set_frozenxids(bool minmxid_only); static void make_outputdirs(char *pgdata); -static void setup(char *argv0, bool *live_check); +static void setup(char *argv0); static void create_logical_replication_slots(void); ClusterInfo old_cluster, @@ -88,7 +88,6 @@ int main(int argc, char **argv) { char *deletion_script_file_name = NULL; - bool live_check = false; /* * pg_upgrade doesn't currently use common/logging.c, but initialize it @@ -123,18 +122,18 @@ main(int argc, char **argv) */ make_outputdirs(new_cluster.pgdata); - setup(argv[0], &live_check); + setup(argv[0]); - output_check_banner(live_check); + output_check_banner(); check_cluster_versions(); - get_sock_dir(&old_cluster, live_check); - get_sock_dir(&new_cluster, false); + get_sock_dir(&old_cluster); + get_sock_dir(&new_cluster); - check_cluster_compatibility(live_check); + check_cluster_compatibility(); - check_and_dump_old_cluster(live_check); + check_and_dump_old_cluster(); /* -- NEW -- */ @@ -331,7 +330,7 @@ make_outputdirs(char *pgdata) static void -setup(char *argv0, bool *live_check) +setup(char *argv0) { /* * make sure the user has a clean environment, otherwise, we may confuse @@ -378,7 +377,7 @@ setup(char *argv0, bool *live_check) pg_fatal("There seems to be a postmaster servicing the old cluster.\n" "Please shutdown that postmaster and try again."); else - *live_check = true; + user_opts.live_check = true; } } @@ -648,7 +647,7 @@ create_new_objects(void) set_frozenxids(true); /* update new_cluster info now that we have objects in the databases */ - get_db_rel_and_slot_infos(&new_cluster, false); + get_db_rel_and_slot_infos(&new_cluster); } /* diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h index 1ebad3bd74..56d05d7eb9 100644 --- a/src/bin/pg_upgrade/pg_upgrade.h +++ b/src/bin/pg_upgrade/pg_upgrade.h @@ -322,6 +322,7 @@ typedef struct typedef struct { bool check; /* check clusters only, don't change any data */ + bool live_check; /* check clusters only, old server is running */ bool do_sync; /* flush changes to disk */ transferMode transfer_mode; /* copy files or link them? */ int jobs; /* number of processes/threads to use */ @@ -366,20 +367,20 @@ extern OSInfo os_info; /* check.c */ -void output_check_banner(bool live_check); -void check_and_dump_old_cluster(bool live_check); +void output_check_banner(void); +void check_and_dump_old_cluster(void); void check_new_cluster(void); void report_clusters_compatible(void); void issue_warnings_and_set_wal_level(void); void output_completion_banner(char *deletion_script_file_name); void check_cluster_versions(void); -void check_cluster_compatibility(bool live_check); +void check_cluster_compatibility(void); void create_script_for_old_cluster_deletion(char **deletion_script_file_name); /* controldata.c */ -void get_control_data(ClusterInfo *cluster, bool live_check); +void get_control_data(ClusterInfo *cluster); void check_control_data(ControlData *oldctrl, ControlData *newctrl); void disable_old_cluster(void); @@ -428,7 +429,7 @@ void check_loadable_libraries(void); FileNameMap *gen_db_file_maps(DbInfo *old_db, DbInfo *new_db, int *nmaps, const char *old_pgdata, const char *new_pgdata); -void get_db_rel_and_slot_infos(ClusterInfo *cluster, bool live_check); +void get_db_rel_and_slot_infos(ClusterInfo *cluster); int count_old_cluster_logical_slots(void); int count_old_cluster_subscriptions(void); @@ -436,7 +437,7 @@ int count_old_cluster_subscriptions(void); void parseCommandLine(int argc, char *argv[]); void adjust_data_dir(ClusterInfo *cluster); -void get_sock_dir(ClusterInfo *cluster, bool live_check); +void get_sock_dir(ClusterInfo *cluster); /* relfilenumber.c */ -- 2.39.3 (Apple Git-146)
>From 48943ad85f83ba44ea01e4b1fdd5c4afc53552e3 Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nat...@postgresql.org> Date: Fri, 28 Jun 2024 21:09:33 -0500 Subject: [PATCH v2 4/6] use new pg_upgrade async API for retrieving relinfos --- src/bin/pg_upgrade/info.c | 187 +++++++++++++++++--------------------- 1 file changed, 81 insertions(+), 106 deletions(-) diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c index 8f1777de59..d07255bd0a 100644 --- a/src/bin/pg_upgrade/info.c +++ b/src/bin/pg_upgrade/info.c @@ -22,13 +22,16 @@ static void report_unmatched_relation(const RelInfo *rel, const DbInfo *db, static void free_db_and_rel_infos(DbInfoArr *db_arr); static void get_template0_info(ClusterInfo *cluster); static void get_db_infos(ClusterInfo *cluster); -static void get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo); +static char *get_rel_infos_query(DbInfo *dbinfo, void *arg); +static void get_rel_infos_result(DbInfo *dbinfo, PGresult *res, void *arg); static void free_rel_infos(RelInfoArr *rel_arr); static void print_db_infos(DbInfoArr *db_arr); static void print_rel_infos(RelInfoArr *rel_arr); static void print_slot_infos(LogicalSlotInfoArr *slot_arr); -static void get_old_cluster_logical_slot_infos(DbInfo *dbinfo); -static void get_db_subscription_count(DbInfo *dbinfo); +static char *get_old_cluster_logical_slot_infos_query(DbInfo *dbinfo, void *arg); +static void get_old_cluster_logical_slot_infos_result(DbInfo *dbinfo, PGresult *res, void *arg); +static char *get_db_subscription_count_query(DbInfo *dbinfo, void *arg); +static void get_db_subscription_count_result(DbInfo *dbinfo, PGresult *res, void *arg); /* @@ -277,7 +280,7 @@ report_unmatched_relation(const RelInfo *rel, const DbInfo *db, bool is_new_db) void get_db_rel_and_slot_infos(ClusterInfo *cluster) { - int dbnum; + AsyncTask *task = async_task_create(); if (cluster->dbarr.dbs != NULL) free_db_and_rel_infos(&cluster->dbarr); @@ -285,23 +288,26 @@ get_db_rel_and_slot_infos(ClusterInfo *cluster) get_template0_info(cluster); get_db_infos(cluster); - for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++) + async_task_add_step(task, + get_rel_infos_query, + get_rel_infos_result, + true, NULL); + if (cluster == &old_cluster && + GET_MAJOR_VERSION(cluster->major_version) > 1600) { - DbInfo *pDbInfo = &cluster->dbarr.dbs[dbnum]; - - get_rel_infos(cluster, pDbInfo); - - /* - * Retrieve the logical replication slots infos and the subscriptions - * count for the old cluster. - */ - if (cluster == &old_cluster) - { - get_old_cluster_logical_slot_infos(pDbInfo); - get_db_subscription_count(pDbInfo); - } + async_task_add_step(task, + get_old_cluster_logical_slot_infos_query, + get_old_cluster_logical_slot_infos_result, + true, cluster); + async_task_add_step(task, + get_db_subscription_count_query, + get_db_subscription_count_result, + true, cluster); } + async_task_run(task, cluster); + async_task_free(task); + if (cluster == &old_cluster) pg_log(PG_VERBOSE, "\nsource databases:"); else @@ -447,30 +453,10 @@ get_db_infos(ClusterInfo *cluster) * Note: the resulting RelInfo array is assumed to be sorted by OID. * This allows later processing to match up old and new databases efficiently. */ -static void -get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo) +static char * +get_rel_infos_query(DbInfo *dbinfo, void *arg) { - PGconn *conn = connectToServer(cluster, - dbinfo->db_name); - PGresult *res; - RelInfo *relinfos; - int ntups; - int relnum; - int num_rels = 0; - char *nspname = NULL; - char *relname = NULL; - char *tablespace = NULL; - int i_spclocation, - i_nspname, - i_relname, - i_reloid, - i_indtable, - i_toastheap, - i_relfilenumber, - i_reltablespace; - char query[QUERY_ALLOC]; - char *last_namespace = NULL, - *last_tablespace = NULL; + char *query = pg_malloc(QUERY_ALLOC); query[0] = '\0'; /* initialize query string to empty */ @@ -484,7 +470,7 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo) * output, so we have to copy that system table. It's easiest to do that * by treating it as a user table. */ - snprintf(query + strlen(query), sizeof(query) - strlen(query), + snprintf(query + strlen(query), QUERY_ALLOC - strlen(query), "WITH regular_heap (reloid, indtable, toastheap) AS ( " " SELECT c.oid, 0::oid, 0::oid " " FROM pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n " @@ -506,7 +492,7 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo) * selected by the regular_heap CTE. (We have to do this separately * because the namespace-name rules above don't work for toast tables.) */ - snprintf(query + strlen(query), sizeof(query) - strlen(query), + snprintf(query + strlen(query), QUERY_ALLOC - strlen(query), " toast_heap (reloid, indtable, toastheap) AS ( " " SELECT c.reltoastrelid, 0::oid, c.oid " " FROM regular_heap JOIN pg_catalog.pg_class c " @@ -519,7 +505,7 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo) * Testing indisready is necessary in 9.2, and harmless in earlier/later * versions. */ - snprintf(query + strlen(query), sizeof(query) - strlen(query), + snprintf(query + strlen(query), QUERY_ALLOC - strlen(query), " all_index (reloid, indtable, toastheap) AS ( " " SELECT indexrelid, indrelid, 0::oid " " FROM pg_catalog.pg_index " @@ -533,7 +519,7 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo) * And now we can write the query that retrieves the data we want for each * heap and index relation. Make sure result is sorted by OID. */ - snprintf(query + strlen(query), sizeof(query) - strlen(query), + snprintf(query + strlen(query), QUERY_ALLOC - strlen(query), "SELECT all_rels.*, n.nspname, c.relname, " " c.relfilenode, c.reltablespace, " " pg_catalog.pg_tablespace_location(t.oid) AS spclocation " @@ -550,22 +536,30 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo) " ON c.reltablespace = t.oid " "ORDER BY 1;"); - res = executeQueryOrDie(conn, "%s", query); - - ntups = PQntuples(res); - - relinfos = (RelInfo *) pg_malloc(sizeof(RelInfo) * ntups); + return query; +} - i_reloid = PQfnumber(res, "reloid"); - i_indtable = PQfnumber(res, "indtable"); - i_toastheap = PQfnumber(res, "toastheap"); - i_nspname = PQfnumber(res, "nspname"); - i_relname = PQfnumber(res, "relname"); - i_relfilenumber = PQfnumber(res, "relfilenode"); - i_reltablespace = PQfnumber(res, "reltablespace"); - i_spclocation = PQfnumber(res, "spclocation"); +static void +get_rel_infos_result(DbInfo *dbinfo, PGresult *res, void *arg) +{ + int ntups = PQntuples(res); + RelInfo *relinfos = (RelInfo *) pg_malloc(sizeof(RelInfo) * ntups); + int i_reloid = PQfnumber(res, "reloid"); + int i_indtable = PQfnumber(res, "indtable"); + int i_toastheap = PQfnumber(res, "toastheap"); + int i_nspname = PQfnumber(res, "nspname"); + int i_relname = PQfnumber(res, "relname"); + int i_relfilenumber = PQfnumber(res, "relfilenode"); + int i_reltablespace = PQfnumber(res, "reltablespace"); + int i_spclocation = PQfnumber(res, "spclocation"); + int num_rels = 0; + char *nspname = NULL; + char *relname = NULL; + char *tablespace = NULL; + char *last_namespace = NULL; + char *last_tablespace = NULL; - for (relnum = 0; relnum < ntups; relnum++) + for (int relnum = 0; relnum < ntups; relnum++) { RelInfo *curr = &relinfos[num_rels++]; @@ -618,9 +612,6 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo) /* A zero reltablespace oid indicates the database tablespace. */ curr->tablespace = dbinfo->db_tablespace; } - PQclear(res); - - PQfinish(conn); dbinfo->rel_arr.rels = relinfos; dbinfo->rel_arr.nrels = num_rels; @@ -642,20 +633,9 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo) * check_old_cluster_for_valid_slots() would raise a FATAL error if such slots * are included. */ -static void -get_old_cluster_logical_slot_infos(DbInfo *dbinfo) +static char * +get_old_cluster_logical_slot_infos_query(DbInfo *dbinfo, void *arg) { - PGconn *conn; - PGresult *res; - LogicalSlotInfo *slotinfos = NULL; - int num_slots; - - /* Logical slots can be migrated since PG17. */ - if (GET_MAJOR_VERSION(old_cluster.major_version) <= 1600) - return; - - conn = connectToServer(&old_cluster, dbinfo->db_name); - /* * Fetch the logical replication slot information. The check whether the * slot is considered caught up is done by an upgrade function. This @@ -673,18 +653,23 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo) * started and stopped several times causing any temporary slots to be * removed. */ - res = executeQueryOrDie(conn, "SELECT slot_name, plugin, two_phase, failover, " - "%s as caught_up, invalidation_reason IS NOT NULL as invalid " - "FROM pg_catalog.pg_replication_slots " - "WHERE slot_type = 'logical' AND " - "database = current_database() AND " - "temporary IS FALSE;", - user_opts.live_check ? "FALSE" : - "(CASE WHEN invalidation_reason IS NOT NULL THEN FALSE " - "ELSE (SELECT pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name)) " - "END)"); - - num_slots = PQntuples(res); + return psprintf("SELECT slot_name, plugin, two_phase, failover, " + "%s as caught_up, invalidation_reason IS NOT NULL as invalid " + "FROM pg_catalog.pg_replication_slots " + "WHERE slot_type = 'logical' AND " + "database = current_database() AND " + "temporary IS FALSE;", + user_opts.live_check ? "FALSE" : + "(CASE WHEN invalidation_reason IS NOT NULL THEN FALSE " + "ELSE (SELECT pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name)) " + "END)"); +} + +static void +get_old_cluster_logical_slot_infos_result(DbInfo *dbinfo, PGresult *res, void *arg) +{ + LogicalSlotInfo *slotinfos = NULL; + int num_slots = PQntuples(res); if (num_slots) { @@ -717,14 +702,10 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo) } } - PQclear(res); - PQfinish(conn); - dbinfo->slot_arr.slots = slotinfos; dbinfo->slot_arr.nslots = num_slots; } - /* * count_old_cluster_logical_slots() * @@ -754,24 +735,18 @@ count_old_cluster_logical_slots(void) * This is because before that the logical slots are not upgraded, so we will * not be able to upgrade the logical replication clusters completely. */ -static void -get_db_subscription_count(DbInfo *dbinfo) +static char * +get_db_subscription_count_query(DbInfo *dbinfo, void *arg) { - PGconn *conn; - PGresult *res; - - /* Subscriptions can be migrated since PG17. */ - if (GET_MAJOR_VERSION(old_cluster.major_version) < 1700) - return; + return psprintf("SELECT count(*) " + "FROM pg_catalog.pg_subscription WHERE subdbid = %u", + dbinfo->db_oid); +} - conn = connectToServer(&old_cluster, dbinfo->db_name); - res = executeQueryOrDie(conn, "SELECT count(*) " - "FROM pg_catalog.pg_subscription WHERE subdbid = %u", - dbinfo->db_oid); +static void +get_db_subscription_count_result(DbInfo *dbinfo, PGresult *res, void *arg) +{ dbinfo->nsubs = atoi(PQgetvalue(res, 0, 0)); - - PQclear(res); - PQfinish(conn); } /* -- 2.39.3 (Apple Git-146)
>From 09e7e7baa8c277a3afbed1e2f8d05bfa7fcc586c Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nat...@postgresql.org> Date: Fri, 28 Jun 2024 21:24:35 -0500 Subject: [PATCH v2 5/6] use new pg_upgrade async API to parallelize getting loadable libraries --- src/bin/pg_upgrade/function.c | 63 ++++++++++++++++++++--------------- 1 file changed, 37 insertions(+), 26 deletions(-) diff --git a/src/bin/pg_upgrade/function.c b/src/bin/pg_upgrade/function.c index 7e3abed098..c11fce0696 100644 --- a/src/bin/pg_upgrade/function.c +++ b/src/bin/pg_upgrade/function.c @@ -42,6 +42,32 @@ library_name_compare(const void *p1, const void *p2) ((const LibraryInfo *) p2)->dbnum); } +struct loadable_libraries_state +{ + PGresult **ress; + int totaltups; +}; + +static char * +get_loadable_libraries_query(DbInfo *dbinfo, void *arg) +{ + return psprintf("SELECT DISTINCT probin " + "FROM pg_catalog.pg_proc " + "WHERE prolang = %u AND " + "probin IS NOT NULL AND " + "oid >= %u;", + ClanguageId, + FirstNormalObjectId); +} + +static void +get_loadable_libraries_result(DbInfo *dbinfo, PGresult *res, void *arg) +{ + struct loadable_libraries_state *state = (struct loadable_libraries_state *) arg; + + state->ress[dbinfo - old_cluster.dbarr.dbs] = res; + state->totaltups += PQntuples(res); +} /* * get_loadable_libraries() @@ -54,47 +80,32 @@ library_name_compare(const void *p1, const void *p2) void get_loadable_libraries(void) { - PGresult **ress; int totaltups; int dbnum; int n_libinfos; + AsyncTask *task = async_task_create(); + struct loadable_libraries_state state; - ress = (PGresult **) pg_malloc(old_cluster.dbarr.ndbs * sizeof(PGresult *)); - totaltups = 0; + state.ress = (PGresult **) pg_malloc(old_cluster.dbarr.ndbs * sizeof(PGresult *)); + state.totaltups = 0; - /* Fetch all library names, removing duplicates within each DB */ - for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++) - { - DbInfo *active_db = &old_cluster.dbarr.dbs[dbnum]; - PGconn *conn = connectToServer(&old_cluster, active_db->db_name); + async_task_add_step(task, get_loadable_libraries_query, + get_loadable_libraries_result, false, &state); - /* - * Fetch all libraries containing non-built-in C functions in this DB. - */ - ress[dbnum] = executeQueryOrDie(conn, - "SELECT DISTINCT probin " - "FROM pg_catalog.pg_proc " - "WHERE prolang = %u AND " - "probin IS NOT NULL AND " - "oid >= %u;", - ClanguageId, - FirstNormalObjectId); - totaltups += PQntuples(ress[dbnum]); - - PQfinish(conn); - } + async_task_run(task, &old_cluster); + async_task_free(task); /* * Allocate memory for required libraries and logical replication output * plugins. */ - n_libinfos = totaltups + count_old_cluster_logical_slots(); + n_libinfos = state.totaltups + count_old_cluster_logical_slots(); os_info.libraries = (LibraryInfo *) pg_malloc(sizeof(LibraryInfo) * n_libinfos); totaltups = 0; for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++) { - PGresult *res = ress[dbnum]; + PGresult *res = state.ress[dbnum]; int ntups; int rowno; LogicalSlotInfoArr *slot_arr = &old_cluster.dbarr.dbs[dbnum].slot_arr; @@ -129,7 +140,7 @@ get_loadable_libraries(void) } } - pg_free(ress); + pg_free(state.ress); os_info.num_libraries = totaltups; } -- 2.39.3 (Apple Git-146)
>From 7a420ff039d48c54cbb4d06647f039257a807bb9 Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nat...@postgresql.org> Date: Fri, 28 Jun 2024 21:31:57 -0500 Subject: [PATCH v2 6/6] use new pg_upgrade async API to parallelize reporting extension updates --- src/bin/pg_upgrade/version.c | 82 ++++++++++++++++++------------------ 1 file changed, 41 insertions(+), 41 deletions(-) diff --git a/src/bin/pg_upgrade/version.c b/src/bin/pg_upgrade/version.c index 2de6dffccd..12783bb2ba 100644 --- a/src/bin/pg_upgrade/version.c +++ b/src/bin/pg_upgrade/version.c @@ -139,6 +139,42 @@ old_9_6_invalidate_hash_indexes(ClusterInfo *cluster, bool check_mode) check_ok(); } +static char * +report_extension_updates_query(DbInfo *dbinfo, void *arg) +{ + return pg_strdup("SELECT name " + "FROM pg_available_extensions " + "WHERE installed_version != default_version"); +} + +static void +report_extension_updates_result(DbInfo *dbinfo, PGresult *res, void *arg) +{ + bool db_used = false; + int ntups = PQntuples(res); + int i_name = PQfnumber(res, "name"); + char *output_path = "update_extensions.sql"; + FILE **script = (FILE **) arg; + + for (int rowno = 0; rowno < ntups; rowno++) + { + if (*script == NULL && (*script = fopen_priv(output_path, "w")) == NULL) + pg_fatal("could not open file \"%s\": %m", output_path); + if (!db_used) + { + PQExpBufferData connectbuf; + + initPQExpBuffer(&connectbuf); + appendPsqlMetaConnect(&connectbuf, dbinfo->db_name); + fputs(connectbuf.data, *script); + termPQExpBuffer(&connectbuf); + db_used = true; + } + fprintf(*script, "ALTER EXTENSION %s UPDATE;\n", + quote_identifier(PQgetvalue(res, rowno, i_name))); + } +} + /* * report_extension_updates() * Report extensions that should be updated. @@ -146,53 +182,17 @@ old_9_6_invalidate_hash_indexes(ClusterInfo *cluster, bool check_mode) void report_extension_updates(ClusterInfo *cluster) { - int dbnum; FILE *script = NULL; char *output_path = "update_extensions.sql"; + AsyncTask *task = async_task_create(); prep_status("Checking for extension updates"); - for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++) - { - PGresult *res; - bool db_used = false; - int ntups; - int rowno; - int i_name; - DbInfo *active_db = &cluster->dbarr.dbs[dbnum]; - PGconn *conn = connectToServer(cluster, active_db->db_name); - - /* find extensions needing updates */ - res = executeQueryOrDie(conn, - "SELECT name " - "FROM pg_available_extensions " - "WHERE installed_version != default_version" - ); - - ntups = PQntuples(res); - i_name = PQfnumber(res, "name"); - for (rowno = 0; rowno < ntups; rowno++) - { - if (script == NULL && (script = fopen_priv(output_path, "w")) == NULL) - pg_fatal("could not open file \"%s\": %m", output_path); - if (!db_used) - { - PQExpBufferData connectbuf; - - initPQExpBuffer(&connectbuf); - appendPsqlMetaConnect(&connectbuf, active_db->db_name); - fputs(connectbuf.data, script); - termPQExpBuffer(&connectbuf); - db_used = true; - } - fprintf(script, "ALTER EXTENSION %s UPDATE;\n", - quote_identifier(PQgetvalue(res, rowno, i_name))); - } + async_task_add_step(task, report_extension_updates_query, + report_extension_updates_result, true, &script); - PQclear(res); - - PQfinish(conn); - } + async_task_run(task, cluster); + async_task_free(task); if (script) { -- 2.39.3 (Apple Git-146)