I spent some time cleaning up names, comments, etc. Barring additional feedback, I'm planning to commit this stuff in the September commitfest so that it has plenty of time to bake in the buildfarm.
-- nathan
>From c38137cd2cbc45c348e8e144f864a62c08bd7b7f Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nat...@postgresql.org> Date: Wed, 28 Aug 2024 10:45:59 -0500 Subject: [PATCH v10 01/11] Introduce framework for parallelizing various pg_upgrade tasks. A number of pg_upgrade steps require connecting to every database in the cluster and running the same query in each one. When there are many databases, these steps are particularly time-consuming, especially since these steps are performed sequentially in a single process. This commit introduces a new framework that makes it easy to parallelize most of these once-in-each-database tasks. Specifically, it manages a simple state machine of slots and uses libpq's asynchronous APIs to establish the connections and run the queries. The --jobs option is used to determine the number of slots to use. To use this new task framework, callers simply need to provide the query and a callback function to process its results, and the framework takes care of the rest. A more complete description is provided at the top of the new task.c file. None of the eligible once-in-each-database tasks are converted to use this new framework in this commit. That will be done via several follow-up commits. Reviewed-by: Jeff Davis, Robert Haas, Daniel Gustafsson, Ilya Gladyshev, Corey Huinker Discussion: https://postgr.es/m/20240516211638.GA1688936%40nathanxps13 --- src/bin/pg_upgrade/Makefile | 1 + src/bin/pg_upgrade/meson.build | 1 + src/bin/pg_upgrade/pg_upgrade.h | 21 ++ src/bin/pg_upgrade/task.c | 426 +++++++++++++++++++++++++++++++ src/tools/pgindent/typedefs.list | 5 + 5 files changed, 454 insertions(+) create mode 100644 src/bin/pg_upgrade/task.c diff --git a/src/bin/pg_upgrade/Makefile b/src/bin/pg_upgrade/Makefile index bde91e2beb..f83d2b5d30 100644 --- a/src/bin/pg_upgrade/Makefile +++ b/src/bin/pg_upgrade/Makefile @@ -25,6 +25,7 @@ OBJS = \ relfilenumber.o \ server.o \ tablespace.o \ + task.o \ util.o \ version.o diff --git a/src/bin/pg_upgrade/meson.build b/src/bin/pg_upgrade/meson.build index 9825fa3305..3d88419674 100644 --- a/src/bin/pg_upgrade/meson.build +++ b/src/bin/pg_upgrade/meson.build @@ -14,6 +14,7 @@ pg_upgrade_sources = files( 'relfilenumber.c', 'server.c', 'tablespace.c', + 'task.c', 'util.c', 'version.c', ) diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h index cdb6e2b759..53f693c2d4 100644 --- a/src/bin/pg_upgrade/pg_upgrade.h +++ b/src/bin/pg_upgrade/pg_upgrade.h @@ -494,3 +494,24 @@ void parallel_transfer_all_new_dbs(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr char *old_pgdata, char *new_pgdata, char *old_tablespace); bool reap_child(bool wait_for_child); + +/* task.c */ + +typedef void (*UpgradeTaskProcessCB) (DbInfo *dbinfo, PGresult *res, void *arg); + +/* struct definition is private to task.c */ +typedef struct UpgradeTask UpgradeTask; + +UpgradeTask *upgrade_task_create(void); +void upgrade_task_add_step(UpgradeTask *task, const char *query, + UpgradeTaskProcessCB process_cb, bool free_result, + void *arg); +void upgrade_task_run(const UpgradeTask *task, const ClusterInfo *cluster); +void upgrade_task_free(UpgradeTask *task); + +/* convenient type for common private data needed by several tasks */ +typedef struct +{ + FILE *file; + char path[MAXPGPATH]; +} UpgradeTaskReport; diff --git a/src/bin/pg_upgrade/task.c b/src/bin/pg_upgrade/task.c new file mode 100644 index 0000000000..b5e2455ae2 --- /dev/null +++ b/src/bin/pg_upgrade/task.c @@ -0,0 +1,426 @@ +/* + * task.c + * framework for parallelizing pg_upgrade's once-in-each-database tasks + * + * This framework provides an efficient way of running the various + * once-in-each-database tasks required by pg_upgrade. Specifically, it + * parallelizes these tasks by managing a simple state machine of + * user_opts.jobs slots and using libpq's asynchronous APIs to establish the + * connections and run the queries. Callers simply need to create a callback + * function and build/execute an UpgradeTask. A simple example follows: + * + * static void + * my_process_cb(DbInfo *dbinfo, PGresult *res, void *arg) + * { + * for (int i = 0; i < PQntuples(res); i++) + * { + * ... process results ... + * } + * } + * + * void + * my_task(ClusterInfo *cluster) + * { + * UpgradeTask *task = upgrade_task_create(); + * + * upgrade_task_add_step(task, + * "... query text ...", + * my_process_cb, + * true, // let the task free the PGresult + * NULL); // "arg" pointer for callback + * upgrade_task_run(task, cluster); + * upgrade_task_free(task); + * } + * + * Note that multiple steps can be added to a given task. When there are + * multiple steps, the task will run all of the steps consecutively in the same + * database connection before freeing the connection and moving on. In other + * words, it only ever initiates one connection to each database in the + * cluster for a given run. + * + * Copyright (c) 2024, PostgreSQL Global Development Group + * src/bin/pg_upgrade/task.c + */ + +#include "postgres_fe.h" + +#include "common/connect.h" +#include "fe_utils/string_utils.h" +#include "pg_upgrade.h" + +/* + * dbs_complete stores the number of databases that we have completed + * processing. When this value equals the number of databases in the cluster, + * the task is finished. + */ +static int dbs_complete; + +/* + * dbs_processing stores the index of the next database in the cluster's array + * of databases that will be picked up for processing. It will always be + * greater than or equal to dbs_complete. + */ +static int dbs_processing; + +/* + * This struct stores all the information for a single step of a task. All + * steps in a task are run in a single connection before moving on to the next + * database (which requires a new connection). + */ +typedef struct UpgradeTaskStep +{ + UpgradeTaskProcessCB process_cb; /* processes the results of the query */ + const char *query; /* query text */ + bool free_result; /* should we free the result? */ + void *arg; /* pointer passed to process_cb */ +} UpgradeTaskStep; + +/* + * This struct is a thin wrapper around an array of steps, i.e., + * UpgradeTaskStep. + */ +typedef struct UpgradeTask +{ + UpgradeTaskStep *steps; + int num_steps; +} UpgradeTask; + +/* + * The different states for a parallel slot. + */ +typedef enum +{ + FREE, /* slot available for use in a new database */ + CONNECTING, /* waiting for connection to be established */ + RUNNING_QUERIES, /* running/processing queries in the task */ +} UpgradeTaskSlotState; + +/* + * We maintain an array of user_opts.jobs slots to execute the task. + */ +typedef struct +{ + UpgradeTaskSlotState state; /* state of the slot */ + int db_idx; /* index of the database assigned to slot */ + int step_idx; /* index of the current step of task */ + PGconn *conn; /* current connection managed by slot */ +} UpgradeTaskSlot; + +/* + * Initializes an UpgradeTask. + */ +UpgradeTask * +upgrade_task_create(void) +{ + UpgradeTask *task = pg_malloc0(sizeof(UpgradeTask)); + + /* All tasks must first set a secure search_path. */ + upgrade_task_add_step(task, ALWAYS_SECURE_SEARCH_PATH_SQL, NULL, true, NULL); + return task; +} + +/* + * Frees all storage associated with an UpgradeTask. + */ +void +upgrade_task_free(UpgradeTask *task) +{ + if (task->steps) + pg_free(task->steps); + + pg_free(task); +} + +/* + * Adds a step to an UpgradeTask. The steps will be executed in each database + * in the order in which they are added. + * + * task: task object that must have been initialized via upgrade_task_create() + * query: the query text + * process_cb: function that processes the results of the query + * free_result: should we free the PGresult, or leave it to the caller? + * arg: pointer to task-specific data that is passed to each callback + */ +void +upgrade_task_add_step(UpgradeTask *task, const char *query, + UpgradeTaskProcessCB process_cb, bool free_result, + void *arg) +{ + UpgradeTaskStep *new_step; + + task->steps = pg_realloc(task->steps, + ++task->num_steps * sizeof(UpgradeTaskStep)); + + new_step = &task->steps[task->num_steps - 1]; + new_step->process_cb = process_cb; + new_step->query = query; + new_step->free_result = free_result; + new_step->arg = arg; +} + +/* + * Build a connection string for the slot's current database and asynchronously + * start a new connection, but do not wait for the connection to be + * established. + */ +static void +start_conn(const ClusterInfo *cluster, UpgradeTaskSlot *slot) +{ + PQExpBufferData conn_opts; + DbInfo *dbinfo = &cluster->dbarr.dbs[slot->db_idx]; + + /* Build connection string with proper quoting */ + initPQExpBuffer(&conn_opts); + appendPQExpBufferStr(&conn_opts, "dbname="); + appendConnStrVal(&conn_opts, dbinfo->db_name); + appendPQExpBufferStr(&conn_opts, " user="); + appendConnStrVal(&conn_opts, os_info.user); + appendPQExpBuffer(&conn_opts, " port=%d", cluster->port); + if (cluster->sockdir) + { + appendPQExpBufferStr(&conn_opts, " host="); + appendConnStrVal(&conn_opts, cluster->sockdir); + } + + slot->conn = PQconnectStart(conn_opts.data); + + if (!slot->conn) + pg_fatal("failed to create connection with connection string: \"%s\"", + conn_opts.data); + + termPQExpBuffer(&conn_opts); +} + +/* + * Run the process_cb callback function to process the result of a query, and + * free the result if the caller indicated we should do so. + */ +static void +process_query_result(const ClusterInfo *cluster, UpgradeTaskSlot *slot, + const UpgradeTask *task) +{ + UpgradeTaskStep *steps = &task->steps[slot->step_idx]; + UpgradeTaskProcessCB process_cb = steps->process_cb; + DbInfo *dbinfo = &cluster->dbarr.dbs[slot->db_idx]; + PGresult *res = PQgetResult(slot->conn); + + if (PQstatus(slot->conn) == CONNECTION_BAD || + (PQresultStatus(res) != PGRES_TUPLES_OK && + PQresultStatus(res) != PGRES_COMMAND_OK)) + pg_fatal("connection failure: %s", PQerrorMessage(slot->conn)); + + /* + * We assume that a NULL process_cb callback function means there's + * nothing to process. This is primarily intended for the inital step in + * every task that sets a safe search_path. + */ + if (process_cb) + (*process_cb) (dbinfo, res, steps->arg); + + if (steps->free_result) + PQclear(res); +} + +/* + * Advances the state machine for a given slot as necessary. + */ +static void +process_slot(const ClusterInfo *cluster, UpgradeTaskSlot *slot, const UpgradeTask *task) +{ + PQExpBufferData queries; + + switch (slot->state) + { + case FREE: + + /* + * If all of the databases in the cluster have been processed or + * are currently being processed by other slots, we are done. + */ + if (dbs_processing >= cluster->dbarr.ndbs) + return; + + /* + * Claim the next database in the cluster's array and initiate a + * new connection. + */ + slot->db_idx = dbs_processing++; + slot->state = CONNECTING; + start_conn(cluster, slot); + + return; + + case CONNECTING: + + /* Check for connection failure. */ + if (PQconnectPoll(slot->conn) == PGRES_POLLING_FAILED) + pg_fatal("connection failure: %s", PQerrorMessage(slot->conn)); + + /* Check whether the connection is still establishing. */ + if (PQconnectPoll(slot->conn) != PGRES_POLLING_OK) + return; + + /* + * Move on to running/processing the queries in the task. We + * combine all the queries and send them to the server together. + */ + slot->state = RUNNING_QUERIES; + initPQExpBuffer(&queries); + for (int i = 0; i < task->num_steps; i++) + appendPQExpBuffer(&queries, "%s;", task->steps[i].query); + if (!PQsendQuery(slot->conn, queries.data)) + pg_fatal("connection failure: %s", PQerrorMessage(slot->conn)); + termPQExpBuffer(&queries); + + return; + + case RUNNING_QUERIES: + + /* + * Consume any available data and clear the read-ready indicator + * for the connection. + */ + if (!PQconsumeInput(slot->conn)) + pg_fatal("connection failure: %s", PQerrorMessage(slot->conn)); + + /* + * Process any results that are ready so that we can free up this + * slot for another database as soon as possible. + */ + for (; slot->step_idx < task->num_steps; slot->step_idx++) + { + /* If no more results are available yet, move on. */ + if (PQisBusy(slot->conn)) + return; + + process_query_result(cluster, slot, task); + } + + /* + * If we just finished processing the result of the last step in + * the task, free the slot. We recursively call this function on + * the newly-freed slot so that we can start initiating the next + * connection immediately instead of waiting for the next loop + * through the slots. + */ + dbs_complete++; + (void) PQgetResult(slot->conn); + PQfinish(slot->conn); + memset(slot, 0, sizeof(UpgradeTaskSlot)); + + process_slot(cluster, slot, task); + + return; + } +} + +/* + * Wait on the slots to either finish connecting or to receive query results if + * possible. This avoids a tight loop in upgrade_task_run(). + */ +static void +wait_on_slots(UpgradeTaskSlot *slots, int numslots) +{ + fd_set input_mask; + fd_set output_mask; + fd_set except_mask; + int maxFd = 0; + + FD_ZERO(&input_mask); + FD_ZERO(&output_mask); + FD_ZERO(&except_mask); + + for (int i = 0; i < numslots; i++) + { + int sock; + bool read = false; + + switch (slots[i].state) + { + case FREE: + + /* + * This function should only ever see free slots as we are + * finishing processing the last few databases, at which point + * we don't have any databases left for them to process. We'll + * never use these slots again, so we can safely ignore them. + */ + continue; + + case CONNECTING: + + /* + * If we are waiting for the connection to establish, choose + * whether to wait for reading or for writing on the socket as + * appropriate. If neither apply, just return immediately so + * that we can handle the slot. + */ + { + PostgresPollingStatusType status; + + status = PQconnectPoll(slots[i].conn); + if (status == PGRES_POLLING_READING) + read = true; + else if (status != PGRES_POLLING_WRITING) + return; + } + break; + + case RUNNING_QUERIES: + + /* + * Once we've sent the queries, we must wait for the socket to + * be read-ready. Note that process_slot() handles calling + * PQconsumeInput() as required. + */ + read = true; + break; + } + + /* + * If there's some problem retrieving the socket, just pretend this + * slot doesn't exist. We don't expect this to happen regularly in + * practice, so it seems unlikely to cause too much harm. + */ + sock = PQsocket(slots[i].conn); + if (sock < 0) + continue; + + /* + * Add the socket to the set. + */ + FD_SET(sock, read ? &input_mask : &output_mask); + FD_SET(sock, &except_mask); + maxFd = Max(maxFd, sock); + } + + /* + * If we found socket(s) to wait on, wait. + */ + if (maxFd != 0) + (void) select(maxFd + 1, &input_mask, &output_mask, &except_mask, NULL); +} + +/* + * Runs all the steps of the task in every database in the cluster using + * user_opts.jobs parallel slots. + */ +void +upgrade_task_run(const UpgradeTask *task, const ClusterInfo *cluster) +{ + int jobs = Max(1, user_opts.jobs); + UpgradeTaskSlot *slots = pg_malloc0(sizeof(UpgradeTaskSlot) * jobs); + + dbs_complete = 0; + dbs_processing = 0; + + while (dbs_complete < cluster->dbarr.ndbs) + { + for (int i = 0; i < jobs; i++) + process_slot(cluster, &slots[i], task); + + wait_on_slots(slots, jobs); + } + + pg_free(slots); +} diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 9e951a9e6f..43c0f9f85b 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -3039,6 +3039,11 @@ UnresolvedTup UnresolvedTupData UpdateContext UpdateStmt +UpgradeTask +UpgradeTaskReport +UpgradeTaskSlot +UpgradeTaskSlotState +UpgradeTaskStep UploadManifestCmd UpperRelationKind UpperUniquePath -- 2.39.3 (Apple Git-146)
>From a1f2ff2ea0a6f9e0eb9af0ec1e400dbc048079cb Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nat...@postgresql.org> Date: Fri, 28 Jun 2024 17:21:19 -0500 Subject: [PATCH v10 02/11] Use pg_upgrade's new parallel framework for subscription checks. Reviewed-by: FIXME Discussion: https://postgr.es/m/20240516211638.GA1688936%40nathanxps13 --- src/bin/pg_upgrade/check.c | 206 ++++++++++++++++++++----------------- 1 file changed, 111 insertions(+), 95 deletions(-) diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c index 96adea41e9..f8160e0140 100644 --- a/src/bin/pg_upgrade/check.c +++ b/src/bin/pg_upgrade/check.c @@ -1905,6 +1905,38 @@ check_old_cluster_for_valid_slots(void) check_ok(); } +/* + * Callback function for processing results of query for + * check_old_cluster_subscription_state()'s UpgradeTask. If the query returned + * any rows (i.e., the check failed), write the details to the report file. + */ +static void +process_old_sub_state_check(DbInfo *dbinfo, PGresult *res, void *arg) +{ + UpgradeTaskReport *report = (UpgradeTaskReport *) arg; + int ntup = PQntuples(res); + int i_srsubstate = PQfnumber(res, "srsubstate"); + int i_subname = PQfnumber(res, "subname"); + int i_nspname = PQfnumber(res, "nspname"); + int i_relname = PQfnumber(res, "relname"); + + AssertVariableIsOfType(&process_old_sub_state_check, UpgradeTaskProcessCB); + + for (int i = 0; i < ntup; i++) + { + if (report->file == NULL && + (report->file = fopen_priv(report->path, "w")) == NULL) + pg_fatal("could not open file \"%s\": %m", report->path); + + fprintf(report->file, "The table sync state \"%s\" is not allowed for database:\"%s\" subscription:\"%s\" schema:\"%s\" relation:\"%s\"\n", + PQgetvalue(res, i, i_srsubstate), + dbinfo->db_name, + PQgetvalue(res, i, i_subname), + PQgetvalue(res, i, i_nspname), + PQgetvalue(res, i, i_relname)); + } +} + /* * check_old_cluster_subscription_state() * @@ -1915,115 +1947,99 @@ check_old_cluster_for_valid_slots(void) static void check_old_cluster_subscription_state(void) { - FILE *script = NULL; - char output_path[MAXPGPATH]; + UpgradeTask *task = upgrade_task_create(); + UpgradeTaskReport report; + const char *query; + PGresult *res; + PGconn *conn; int ntup; prep_status("Checking for subscription state"); - snprintf(output_path, sizeof(output_path), "%s/%s", + report.file = NULL; + snprintf(report.path, sizeof(report.path), "%s/%s", log_opts.basedir, "subs_invalid.txt"); - for (int dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++) - { - PGresult *res; - DbInfo *active_db = &old_cluster.dbarr.dbs[dbnum]; - PGconn *conn = connectToServer(&old_cluster, active_db->db_name); - - /* We need to check for pg_replication_origin only once. */ - if (dbnum == 0) - { - /* - * Check that all the subscriptions have their respective - * replication origin. - */ - res = executeQueryOrDie(conn, - "SELECT d.datname, s.subname " - "FROM pg_catalog.pg_subscription s " - "LEFT OUTER JOIN pg_catalog.pg_replication_origin o " - " ON o.roname = 'pg_' || s.oid " - "INNER JOIN pg_catalog.pg_database d " - " ON d.oid = s.subdbid " - "WHERE o.roname IS NULL;"); - - ntup = PQntuples(res); - for (int i = 0; i < ntup; i++) - { - if (script == NULL && (script = fopen_priv(output_path, "w")) == NULL) - pg_fatal("could not open file \"%s\": %m", output_path); - fprintf(script, "The replication origin is missing for database:\"%s\" subscription:\"%s\"\n", - PQgetvalue(res, i, 0), - PQgetvalue(res, i, 1)); - } - PQclear(res); - } - - /* - * We don't allow upgrade if there is a risk of dangling slot or - * origin corresponding to initial sync after upgrade. - * - * A slot/origin not created yet refers to the 'i' (initialize) state, - * while 'r' (ready) state refers to a slot/origin created previously - * but already dropped. These states are supported for pg_upgrade. The - * other states listed below are not supported: - * - * a) SUBREL_STATE_DATASYNC: A relation upgraded while in this state - * would retain a replication slot, which could not be dropped by the - * sync worker spawned after the upgrade because the subscription ID - * used for the slot name won't match anymore. - * - * b) SUBREL_STATE_SYNCDONE: A relation upgraded while in this state - * would retain the replication origin when there is a failure in - * tablesync worker immediately after dropping the replication slot in - * the publisher. - * - * c) SUBREL_STATE_FINISHEDCOPY: A tablesync worker spawned to work on - * a relation upgraded while in this state would expect an origin ID - * with the OID of the subscription used before the upgrade, causing - * it to fail. - * - * d) SUBREL_STATE_SYNCWAIT, SUBREL_STATE_CATCHUP and - * SUBREL_STATE_UNKNOWN: These states are not stored in the catalog, - * so we need not allow these states. - */ - res = executeQueryOrDie(conn, - "SELECT r.srsubstate, s.subname, n.nspname, c.relname " - "FROM pg_catalog.pg_subscription_rel r " - "LEFT JOIN pg_catalog.pg_subscription s" - " ON r.srsubid = s.oid " - "LEFT JOIN pg_catalog.pg_class c" - " ON r.srrelid = c.oid " - "LEFT JOIN pg_catalog.pg_namespace n" - " ON c.relnamespace = n.oid " - "WHERE r.srsubstate NOT IN ('i', 'r') " - "ORDER BY s.subname"); - - ntup = PQntuples(res); - for (int i = 0; i < ntup; i++) - { - if (script == NULL && (script = fopen_priv(output_path, "w")) == NULL) - pg_fatal("could not open file \"%s\": %m", output_path); - - fprintf(script, "The table sync state \"%s\" is not allowed for database:\"%s\" subscription:\"%s\" schema:\"%s\" relation:\"%s\"\n", - PQgetvalue(res, i, 0), - active_db->db_name, - PQgetvalue(res, i, 1), - PQgetvalue(res, i, 2), - PQgetvalue(res, i, 3)); - } - PQclear(res); - PQfinish(conn); + /* + * Check that all the subscriptions have their respective replication + * origin. This check only needs to run once. + */ + conn = connectToServer(&old_cluster, old_cluster.dbarr.dbs[0].db_name); + res = executeQueryOrDie(conn, + "SELECT d.datname, s.subname " + "FROM pg_catalog.pg_subscription s " + "LEFT OUTER JOIN pg_catalog.pg_replication_origin o " + " ON o.roname = 'pg_' || s.oid " + "INNER JOIN pg_catalog.pg_database d " + " ON d.oid = s.subdbid " + "WHERE o.roname IS NULL;"); + ntup = PQntuples(res); + for (int i = 0; i < ntup; i++) + { + if (report.file == NULL && + (report.file = fopen_priv(report.path, "w")) == NULL) + pg_fatal("could not open file \"%s\": %m", report.path); + fprintf(report.file, "The replication origin is missing for database:\"%s\" subscription:\"%s\"\n", + PQgetvalue(res, i, 0), + PQgetvalue(res, i, 1)); } + PQclear(res); + PQfinish(conn); - if (script) + /* + * We don't allow upgrade if there is a risk of dangling slot or origin + * corresponding to initial sync after upgrade. + * + * A slot/origin not created yet refers to the 'i' (initialize) state, + * while 'r' (ready) state refers to a slot/origin created previously but + * already dropped. These states are supported for pg_upgrade. The other + * states listed below are not supported: + * + * a) SUBREL_STATE_DATASYNC: A relation upgraded while in this state would + * retain a replication slot, which could not be dropped by the sync + * worker spawned after the upgrade because the subscription ID used for + * the slot name won't match anymore. + * + * b) SUBREL_STATE_SYNCDONE: A relation upgraded while in this state would + * retain the replication origin when there is a failure in tablesync + * worker immediately after dropping the replication slot in the + * publisher. + * + * c) SUBREL_STATE_FINISHEDCOPY: A tablesync worker spawned to work on a + * relation upgraded while in this state would expect an origin ID with + * the OID of the subscription used before the upgrade, causing it to + * fail. + * + * d) SUBREL_STATE_SYNCWAIT, SUBREL_STATE_CATCHUP and + * SUBREL_STATE_UNKNOWN: These states are not stored in the catalog, so we + * need not allow these states. + */ + query = "SELECT r.srsubstate, s.subname, n.nspname, c.relname " + "FROM pg_catalog.pg_subscription_rel r " + "LEFT JOIN pg_catalog.pg_subscription s" + " ON r.srsubid = s.oid " + "LEFT JOIN pg_catalog.pg_class c" + " ON r.srrelid = c.oid " + "LEFT JOIN pg_catalog.pg_namespace n" + " ON c.relnamespace = n.oid " + "WHERE r.srsubstate NOT IN ('i', 'r') " + "ORDER BY s.subname"; + + upgrade_task_add_step(task, query, process_old_sub_state_check, + true, &report); + + upgrade_task_run(task, &old_cluster); + upgrade_task_free(task); + + if (report.file) { - fclose(script); + fclose(report.file); pg_log(PG_REPORT, "fatal"); pg_fatal("Your installation contains subscriptions without origin or having relations not in i (initialize) or r (ready) state.\n" "You can allow the initial sync to finish for all relations and then restart the upgrade.\n" "A list of the problematic subscriptions is in the file:\n" - " %s", output_path); + " %s", report.path); } else check_ok(); -- 2.39.3 (Apple Git-146)
>From 67479ba2693eec0f16208b3e43a2d22cba38a15e Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nat...@postgresql.org> Date: Fri, 28 Jun 2024 21:09:33 -0500 Subject: [PATCH v10 03/11] Use pg_upgrade's new parallel framework to get relation info. Reviewed-by: FIXME Discussion: https://postgr.es/m/20240516211638.GA1688936%40nathanxps13 --- src/bin/pg_upgrade/info.c | 296 ++++++++++++++++++++------------------ 1 file changed, 154 insertions(+), 142 deletions(-) diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c index d3c1e8918d..2bfc8dcfba 100644 --- a/src/bin/pg_upgrade/info.c +++ b/src/bin/pg_upgrade/info.c @@ -11,6 +11,7 @@ #include "access/transam.h" #include "catalog/pg_class_d.h" +#include "pqexpbuffer.h" #include "pg_upgrade.h" static void create_rel_filename_map(const char *old_data, const char *new_data, @@ -22,12 +23,14 @@ static void report_unmatched_relation(const RelInfo *rel, const DbInfo *db, static void free_db_and_rel_infos(DbInfoArr *db_arr); static void get_template0_info(ClusterInfo *cluster); static void get_db_infos(ClusterInfo *cluster); -static void get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo); +static char *get_rel_infos_query(void); +static void process_rel_infos(DbInfo *dbinfo, PGresult *res, void *arg); static void free_rel_infos(RelInfoArr *rel_arr); static void print_db_infos(DbInfoArr *db_arr); static void print_rel_infos(RelInfoArr *rel_arr); static void print_slot_infos(LogicalSlotInfoArr *slot_arr); -static void get_old_cluster_logical_slot_infos(DbInfo *dbinfo); +static char *get_old_cluster_logical_slot_infos_query(void); +static void process_old_cluster_logical_slot_infos(DbInfo *dbinfo, PGresult *res, void *arg); /* @@ -276,7 +279,9 @@ report_unmatched_relation(const RelInfo *rel, const DbInfo *db, bool is_new_db) void get_db_rel_and_slot_infos(ClusterInfo *cluster) { - int dbnum; + UpgradeTask *task = upgrade_task_create(); + char *rel_infos_query = NULL; + char *logical_slot_infos_query = NULL; if (cluster->dbarr.dbs != NULL) free_db_and_rel_infos(&cluster->dbarr); @@ -284,15 +289,37 @@ get_db_rel_and_slot_infos(ClusterInfo *cluster) get_template0_info(cluster); get_db_infos(cluster); - for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++) + rel_infos_query = get_rel_infos_query(); + upgrade_task_add_step(task, + rel_infos_query, + process_rel_infos, + true, NULL); + + /* + * Logical slots are only carried over to the new cluster when the old + * cluster is on PG17 or newer. This is because before that the logical + * slots are not saved at shutdown, so there is no guarantee that the + * latest confirmed_flush_lsn is saved to disk which can lead to data + * loss. It is still not guaranteed for manually created slots in PG17, so + * subsequent checks done in check_old_cluster_for_valid_slots() would + * raise a FATAL error if such slots are included. + */ + if (cluster == &old_cluster && + GET_MAJOR_VERSION(cluster->major_version) > 1600) { - DbInfo *pDbInfo = &cluster->dbarr.dbs[dbnum]; + logical_slot_infos_query = get_old_cluster_logical_slot_infos_query(); + upgrade_task_add_step(task, + logical_slot_infos_query, + process_old_cluster_logical_slot_infos, + true, NULL); + } - get_rel_infos(cluster, pDbInfo); + upgrade_task_run(task, cluster); + upgrade_task_free(task); - if (cluster == &old_cluster) - get_old_cluster_logical_slot_infos(pDbInfo); - } + pg_free(rel_infos_query); + if (logical_slot_infos_query) + pg_free(logical_slot_infos_query); if (cluster == &old_cluster) pg_log(PG_VERBOSE, "\nsource databases:"); @@ -431,40 +458,21 @@ get_db_infos(ClusterInfo *cluster) /* - * get_rel_infos() + * get_rel_infos_query() * - * gets the relinfos for all the user tables and indexes of the database - * referred to by "dbinfo". + * Returns the query for retrieving the relation information for all the user + * tables and indexes in the database, for use by get_db_rel_and_slot_infos()'s + * UpgradeTask. * - * Note: the resulting RelInfo array is assumed to be sorted by OID. - * This allows later processing to match up old and new databases efficiently. + * Note: the result is assumed to be sorted by OID. This allows later + * processing to match up old and new databases efficiently. */ -static void -get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo) +static char * +get_rel_infos_query(void) { - PGconn *conn = connectToServer(cluster, - dbinfo->db_name); - PGresult *res; - RelInfo *relinfos; - int ntups; - int relnum; - int num_rels = 0; - char *nspname = NULL; - char *relname = NULL; - char *tablespace = NULL; - int i_spclocation, - i_nspname, - i_relname, - i_reloid, - i_indtable, - i_toastheap, - i_relfilenumber, - i_reltablespace; - char query[QUERY_ALLOC]; - char *last_namespace = NULL, - *last_tablespace = NULL; + PQExpBufferData query; - query[0] = '\0'; /* initialize query string to empty */ + initPQExpBuffer(&query); /* * Create a CTE that collects OIDs of regular user tables and matviews, @@ -476,34 +484,34 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo) * output, so we have to copy that system table. It's easiest to do that * by treating it as a user table. */ - snprintf(query + strlen(query), sizeof(query) - strlen(query), - "WITH regular_heap (reloid, indtable, toastheap) AS ( " - " SELECT c.oid, 0::oid, 0::oid " - " FROM pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n " - " ON c.relnamespace = n.oid " - " WHERE relkind IN (" CppAsString2(RELKIND_RELATION) ", " - CppAsString2(RELKIND_MATVIEW) ") AND " + appendPQExpBuffer(&query, + "WITH regular_heap (reloid, indtable, toastheap) AS ( " + " SELECT c.oid, 0::oid, 0::oid " + " FROM pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n " + " ON c.relnamespace = n.oid " + " WHERE relkind IN (" CppAsString2(RELKIND_RELATION) ", " + CppAsString2(RELKIND_MATVIEW) ") AND " /* exclude possible orphaned temp tables */ - " ((n.nspname !~ '^pg_temp_' AND " - " n.nspname !~ '^pg_toast_temp_' AND " - " n.nspname NOT IN ('pg_catalog', 'information_schema', " - " 'binary_upgrade', 'pg_toast') AND " - " c.oid >= %u::pg_catalog.oid) OR " - " (n.nspname = 'pg_catalog' AND " - " relname IN ('pg_largeobject') ))), ", - FirstNormalObjectId); + " ((n.nspname !~ '^pg_temp_' AND " + " n.nspname !~ '^pg_toast_temp_' AND " + " n.nspname NOT IN ('pg_catalog', 'information_schema', " + " 'binary_upgrade', 'pg_toast') AND " + " c.oid >= %u::pg_catalog.oid) OR " + " (n.nspname = 'pg_catalog' AND " + " relname IN ('pg_largeobject') ))), ", + FirstNormalObjectId); /* * Add a CTE that collects OIDs of toast tables belonging to the tables * selected by the regular_heap CTE. (We have to do this separately * because the namespace-name rules above don't work for toast tables.) */ - snprintf(query + strlen(query), sizeof(query) - strlen(query), - " toast_heap (reloid, indtable, toastheap) AS ( " - " SELECT c.reltoastrelid, 0::oid, c.oid " - " FROM regular_heap JOIN pg_catalog.pg_class c " - " ON regular_heap.reloid = c.oid " - " WHERE c.reltoastrelid != 0), "); + appendPQExpBufferStr(&query, + " toast_heap (reloid, indtable, toastheap) AS ( " + " SELECT c.reltoastrelid, 0::oid, c.oid " + " FROM regular_heap JOIN pg_catalog.pg_class c " + " ON regular_heap.reloid = c.oid " + " WHERE c.reltoastrelid != 0), "); /* * Add a CTE that collects OIDs of all valid indexes on the previously @@ -511,53 +519,68 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo) * Testing indisready is necessary in 9.2, and harmless in earlier/later * versions. */ - snprintf(query + strlen(query), sizeof(query) - strlen(query), - " all_index (reloid, indtable, toastheap) AS ( " - " SELECT indexrelid, indrelid, 0::oid " - " FROM pg_catalog.pg_index " - " WHERE indisvalid AND indisready " - " AND indrelid IN " - " (SELECT reloid FROM regular_heap " - " UNION ALL " - " SELECT reloid FROM toast_heap)) "); + appendPQExpBufferStr(&query, + " all_index (reloid, indtable, toastheap) AS ( " + " SELECT indexrelid, indrelid, 0::oid " + " FROM pg_catalog.pg_index " + " WHERE indisvalid AND indisready " + " AND indrelid IN " + " (SELECT reloid FROM regular_heap " + " UNION ALL " + " SELECT reloid FROM toast_heap)) "); /* * And now we can write the query that retrieves the data we want for each * heap and index relation. Make sure result is sorted by OID. */ - snprintf(query + strlen(query), sizeof(query) - strlen(query), - "SELECT all_rels.*, n.nspname, c.relname, " - " c.relfilenode, c.reltablespace, " - " pg_catalog.pg_tablespace_location(t.oid) AS spclocation " - "FROM (SELECT * FROM regular_heap " - " UNION ALL " - " SELECT * FROM toast_heap " - " UNION ALL " - " SELECT * FROM all_index) all_rels " - " JOIN pg_catalog.pg_class c " - " ON all_rels.reloid = c.oid " - " JOIN pg_catalog.pg_namespace n " - " ON c.relnamespace = n.oid " - " LEFT OUTER JOIN pg_catalog.pg_tablespace t " - " ON c.reltablespace = t.oid " - "ORDER BY 1;"); - - res = executeQueryOrDie(conn, "%s", query); - - ntups = PQntuples(res); + appendPQExpBufferStr(&query, + "SELECT all_rels.*, n.nspname, c.relname, " + " c.relfilenode, c.reltablespace, " + " pg_catalog.pg_tablespace_location(t.oid) AS spclocation " + "FROM (SELECT * FROM regular_heap " + " UNION ALL " + " SELECT * FROM toast_heap " + " UNION ALL " + " SELECT * FROM all_index) all_rels " + " JOIN pg_catalog.pg_class c " + " ON all_rels.reloid = c.oid " + " JOIN pg_catalog.pg_namespace n " + " ON c.relnamespace = n.oid " + " LEFT OUTER JOIN pg_catalog.pg_tablespace t " + " ON c.reltablespace = t.oid " + "ORDER BY 1;"); + + return query.data; +} - relinfos = (RelInfo *) pg_malloc(sizeof(RelInfo) * ntups); +/* + * Callback function for processing results of the query returned by + * get_rel_infos_query(), which is used for get_db_rel_and_slot_infos()'s + * UpgradeTask. This function stores the relation information for later use. + */ +static void +process_rel_infos(DbInfo *dbinfo, PGresult *res, void *arg) +{ + int ntups = PQntuples(res); + RelInfo *relinfos = (RelInfo *) pg_malloc(sizeof(RelInfo) * ntups); + int i_reloid = PQfnumber(res, "reloid"); + int i_indtable = PQfnumber(res, "indtable"); + int i_toastheap = PQfnumber(res, "toastheap"); + int i_nspname = PQfnumber(res, "nspname"); + int i_relname = PQfnumber(res, "relname"); + int i_relfilenumber = PQfnumber(res, "relfilenode"); + int i_reltablespace = PQfnumber(res, "reltablespace"); + int i_spclocation = PQfnumber(res, "spclocation"); + int num_rels = 0; + char *nspname = NULL; + char *relname = NULL; + char *tablespace = NULL; + char *last_namespace = NULL; + char *last_tablespace = NULL; - i_reloid = PQfnumber(res, "reloid"); - i_indtable = PQfnumber(res, "indtable"); - i_toastheap = PQfnumber(res, "toastheap"); - i_nspname = PQfnumber(res, "nspname"); - i_relname = PQfnumber(res, "relname"); - i_relfilenumber = PQfnumber(res, "relfilenode"); - i_reltablespace = PQfnumber(res, "reltablespace"); - i_spclocation = PQfnumber(res, "spclocation"); + AssertVariableIsOfType(&process_rel_infos, UpgradeTaskProcessCB); - for (relnum = 0; relnum < ntups; relnum++) + for (int relnum = 0; relnum < ntups; relnum++) { RelInfo *curr = &relinfos[num_rels++]; @@ -610,44 +633,22 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo) /* A zero reltablespace oid indicates the database tablespace. */ curr->tablespace = dbinfo->db_tablespace; } - PQclear(res); - - PQfinish(conn); dbinfo->rel_arr.rels = relinfos; dbinfo->rel_arr.nrels = num_rels; } /* - * get_old_cluster_logical_slot_infos() - * - * Gets the LogicalSlotInfos for all the logical replication slots of the - * database referred to by "dbinfo". The status of each logical slot is gotten - * here, but they are used at the checking phase. See - * check_old_cluster_for_valid_slots(). + * get_old_cluster_logical_slot_infos_query() * - * Note: This function will not do anything if the old cluster is pre-PG17. - * This is because before that the logical slots are not saved at shutdown, so - * there is no guarantee that the latest confirmed_flush_lsn is saved to disk - * which can lead to data loss. It is still not guaranteed for manually created - * slots in PG17, so subsequent checks done in - * check_old_cluster_for_valid_slots() would raise a FATAL error if such slots - * are included. + * Returns the query for retrieving the logical slot information for all the + * logical replication slots in the database, for use by + * get_db_rel_and_slot_infos()'s UpgradeTask. The status of each logical slot + * is checked in check_old_cluster_for_valid_slots(). */ -static void -get_old_cluster_logical_slot_infos(DbInfo *dbinfo) +static char * +get_old_cluster_logical_slot_infos_query(void) { - PGconn *conn; - PGresult *res; - LogicalSlotInfo *slotinfos = NULL; - int num_slots; - - /* Logical slots can be migrated since PG17. */ - if (GET_MAJOR_VERSION(old_cluster.major_version) <= 1600) - return; - - conn = connectToServer(&old_cluster, dbinfo->db_name); - /* * Fetch the logical replication slot information. The check whether the * slot is considered caught up is done by an upgrade function. This @@ -665,18 +666,32 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo) * started and stopped several times causing any temporary slots to be * removed. */ - res = executeQueryOrDie(conn, "SELECT slot_name, plugin, two_phase, failover, " - "%s as caught_up, invalidation_reason IS NOT NULL as invalid " - "FROM pg_catalog.pg_replication_slots " - "WHERE slot_type = 'logical' AND " - "database = current_database() AND " - "temporary IS FALSE;", - user_opts.live_check ? "FALSE" : - "(CASE WHEN invalidation_reason IS NOT NULL THEN FALSE " - "ELSE (SELECT pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name)) " - "END)"); - - num_slots = PQntuples(res); + return psprintf("SELECT slot_name, plugin, two_phase, failover, " + "%s as caught_up, invalidation_reason IS NOT NULL as invalid " + "FROM pg_catalog.pg_replication_slots " + "WHERE slot_type = 'logical' AND " + "database = current_database() AND " + "temporary IS FALSE;", + user_opts.live_check ? "FALSE" : + "(CASE WHEN invalidation_reason IS NOT NULL THEN FALSE " + "ELSE (SELECT pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name)) " + "END)"); +} + +/* + * Callback function for processing results of the query returned by + * get_old_cluster_logical_slot_infos_query(), which is used for + * get_db_rel_and_slot_infos()'s UpgradeTask. This function stores the logical + * slot information for later use. + */ +static void +process_old_cluster_logical_slot_infos(DbInfo *dbinfo, PGresult *res, void *arg) +{ + LogicalSlotInfo *slotinfos = NULL; + int num_slots = PQntuples(res); + + AssertVariableIsOfType(&process_old_cluster_logical_slot_infos, + UpgradeTaskProcessCB); if (num_slots) { @@ -709,9 +724,6 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo) } } - PQclear(res); - PQfinish(conn); - dbinfo->slot_arr.slots = slotinfos; dbinfo->slot_arr.nslots = num_slots; } -- 2.39.3 (Apple Git-146)
>From 0abadb72bf3675b0d48721b69bdaa5a734827112 Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nat...@postgresql.org> Date: Wed, 28 Aug 2024 13:32:33 -0500 Subject: [PATCH v10 04/11] Use pg_upgrade's new parallel framework to get loadable libraries. Reviewed-by: FIXME Discussion: https://postgr.es/m/20240516211638.GA1688936%40nathanxps13 --- src/bin/pg_upgrade/function.c | 71 ++++++++++++++++++++++------------- 1 file changed, 45 insertions(+), 26 deletions(-) diff --git a/src/bin/pg_upgrade/function.c b/src/bin/pg_upgrade/function.c index 7e3abed098..0588347b49 100644 --- a/src/bin/pg_upgrade/function.c +++ b/src/bin/pg_upgrade/function.c @@ -42,6 +42,30 @@ library_name_compare(const void *p1, const void *p2) ((const LibraryInfo *) p2)->dbnum); } +/* + * Private state for get_loadable_libraries()'s UpgradeTask. + */ +struct loadable_libraries_state +{ + PGresult **ress; /* results for each database */ + int totaltups; /* number of tuples in all results */ +}; + +/* + * Callback function for processing results of query for + * get_loadable_libraries()'s UpgradeTask. This function stores the results + * for later use within get_loadable_libraries(). + */ +static void +process_loadable_libraries(DbInfo *dbinfo, PGresult *res, void *arg) +{ + struct loadable_libraries_state *state = (struct loadable_libraries_state *) arg; + + AssertVariableIsOfType(&process_loadable_libraries, UpgradeTaskProcessCB); + + state->ress[dbinfo - old_cluster.dbarr.dbs] = res; + state->totaltups += PQntuples(res); +} /* * get_loadable_libraries() @@ -54,47 +78,41 @@ library_name_compare(const void *p1, const void *p2) void get_loadable_libraries(void) { - PGresult **ress; int totaltups; int dbnum; int n_libinfos; + UpgradeTask *task = upgrade_task_create(); + struct loadable_libraries_state state; + char *query; - ress = (PGresult **) pg_malloc(old_cluster.dbarr.ndbs * sizeof(PGresult *)); - totaltups = 0; + state.ress = (PGresult **) pg_malloc(old_cluster.dbarr.ndbs * sizeof(PGresult *)); + state.totaltups = 0; - /* Fetch all library names, removing duplicates within each DB */ - for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++) - { - DbInfo *active_db = &old_cluster.dbarr.dbs[dbnum]; - PGconn *conn = connectToServer(&old_cluster, active_db->db_name); + query = psprintf("SELECT DISTINCT probin " + "FROM pg_catalog.pg_proc " + "WHERE prolang = %u AND " + "probin IS NOT NULL AND " + "oid >= %u;", + ClanguageId, + FirstNormalObjectId); - /* - * Fetch all libraries containing non-built-in C functions in this DB. - */ - ress[dbnum] = executeQueryOrDie(conn, - "SELECT DISTINCT probin " - "FROM pg_catalog.pg_proc " - "WHERE prolang = %u AND " - "probin IS NOT NULL AND " - "oid >= %u;", - ClanguageId, - FirstNormalObjectId); - totaltups += PQntuples(ress[dbnum]); - - PQfinish(conn); - } + upgrade_task_add_step(task, query, process_loadable_libraries, + false, &state); + + upgrade_task_run(task, &old_cluster); + upgrade_task_free(task); /* * Allocate memory for required libraries and logical replication output * plugins. */ - n_libinfos = totaltups + count_old_cluster_logical_slots(); + n_libinfos = state.totaltups + count_old_cluster_logical_slots(); os_info.libraries = (LibraryInfo *) pg_malloc(sizeof(LibraryInfo) * n_libinfos); totaltups = 0; for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++) { - PGresult *res = ress[dbnum]; + PGresult *res = state.ress[dbnum]; int ntups; int rowno; LogicalSlotInfoArr *slot_arr = &old_cluster.dbarr.dbs[dbnum].slot_arr; @@ -129,7 +147,8 @@ get_loadable_libraries(void) } } - pg_free(ress); + pg_free(state.ress); + pg_free(query); os_info.num_libraries = totaltups; } -- 2.39.3 (Apple Git-146)
>From d12f81e0320b388434b24d1d679571764e7fbd06 Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nat...@postgresql.org> Date: Wed, 28 Aug 2024 14:18:39 -0500 Subject: [PATCH v10 05/11] Use pg_upgrade's new parallel framework for extension updates. Reviewed-by: FIXME Discussion: https://postgr.es/m/20240516211638.GA1688936%40nathanxps13 --- src/bin/pg_upgrade/version.c | 94 +++++++++++++++++++----------------- 1 file changed, 49 insertions(+), 45 deletions(-) diff --git a/src/bin/pg_upgrade/version.c b/src/bin/pg_upgrade/version.c index 2de6dffccd..5084b08805 100644 --- a/src/bin/pg_upgrade/version.c +++ b/src/bin/pg_upgrade/version.c @@ -139,6 +139,41 @@ old_9_6_invalidate_hash_indexes(ClusterInfo *cluster, bool check_mode) check_ok(); } +/* + * Callback function for processing results of query for + * report_extension_updates()'s UpgradeTask. If the query returned any rows, + * write the details to the report file. + */ +static void +process_extension_updates(DbInfo *dbinfo, PGresult *res, void *arg) +{ + bool db_used = false; + int ntups = PQntuples(res); + int i_name = PQfnumber(res, "name"); + UpgradeTaskReport *report = (UpgradeTaskReport *) arg; + + AssertVariableIsOfType(&process_extension_updates, UpgradeTaskProcessCB); + + for (int rowno = 0; rowno < ntups; rowno++) + { + if (report->file == NULL && + (report->file = fopen_priv(report->path, "w")) == NULL) + pg_fatal("could not open file \"%s\": %m", report->path); + if (!db_used) + { + PQExpBufferData connectbuf; + + initPQExpBuffer(&connectbuf); + appendPsqlMetaConnect(&connectbuf, dbinfo->db_name); + fputs(connectbuf.data, report->file); + termPQExpBuffer(&connectbuf); + db_used = true; + } + fprintf(report->file, "ALTER EXTENSION %s UPDATE;\n", + quote_identifier(PQgetvalue(res, rowno, i_name))); + } +} + /* * report_extension_updates() * Report extensions that should be updated. @@ -146,57 +181,26 @@ old_9_6_invalidate_hash_indexes(ClusterInfo *cluster, bool check_mode) void report_extension_updates(ClusterInfo *cluster) { - int dbnum; - FILE *script = NULL; - char *output_path = "update_extensions.sql"; + UpgradeTaskReport report; + UpgradeTask *task = upgrade_task_create(); + const char *query = "SELECT name " + "FROM pg_available_extensions " + "WHERE installed_version != default_version"; prep_status("Checking for extension updates"); - for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++) - { - PGresult *res; - bool db_used = false; - int ntups; - int rowno; - int i_name; - DbInfo *active_db = &cluster->dbarr.dbs[dbnum]; - PGconn *conn = connectToServer(cluster, active_db->db_name); - - /* find extensions needing updates */ - res = executeQueryOrDie(conn, - "SELECT name " - "FROM pg_available_extensions " - "WHERE installed_version != default_version" - ); + report.file = NULL; + strcpy(report.path, "update_extensions.sql"); - ntups = PQntuples(res); - i_name = PQfnumber(res, "name"); - for (rowno = 0; rowno < ntups; rowno++) - { - if (script == NULL && (script = fopen_priv(output_path, "w")) == NULL) - pg_fatal("could not open file \"%s\": %m", output_path); - if (!db_used) - { - PQExpBufferData connectbuf; + upgrade_task_add_step(task, query, process_extension_updates, + true, &report); - initPQExpBuffer(&connectbuf); - appendPsqlMetaConnect(&connectbuf, active_db->db_name); - fputs(connectbuf.data, script); - termPQExpBuffer(&connectbuf); - db_used = true; - } - fprintf(script, "ALTER EXTENSION %s UPDATE;\n", - quote_identifier(PQgetvalue(res, rowno, i_name))); - } + upgrade_task_run(task, cluster); + upgrade_task_free(task); - PQclear(res); - - PQfinish(conn); - } - - if (script) + if (report.file) { - fclose(script); + fclose(report.file); report_status(PG_REPORT, "notice"); pg_log(PG_REPORT, "\n" "Your installation contains extensions that should be updated\n" @@ -204,7 +208,7 @@ report_extension_updates(ClusterInfo *cluster) " %s\n" "when executed by psql by the database superuser will update\n" "these extensions.", - output_path); + report.path); } else check_ok(); -- 2.39.3 (Apple Git-146)
>From 7a5405cdd97bfb0a4a69167c412ef47b4380ba94 Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nat...@postgresql.org> Date: Sat, 6 Jul 2024 21:06:31 -0500 Subject: [PATCH v10 06/11] Use pg_upgrade's new parallel framework for data type checks. Reviewed-by: FIXME Discussion: https://postgr.es/m/20240516211638.GA1688936%40nathanxps13 --- src/bin/pg_upgrade/check.c | 351 ++++++++++++++++++++----------------- 1 file changed, 191 insertions(+), 160 deletions(-) diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c index f8160e0140..f935b53e1f 100644 --- a/src/bin/pg_upgrade/check.c +++ b/src/bin/pg_upgrade/check.c @@ -314,6 +314,147 @@ static DataTypesUsageChecks data_types_usage_checks[] = } }; +/* + * Private state for check_for_data_types_usage()'s UpgradeTask. + */ +struct data_type_check_state +{ + DataTypesUsageChecks *check; /* the check for this step */ + bool *result; /* true if check failed for any database */ + PQExpBuffer *report; /* buffer for report on failed checks */ +}; + +/* + * Returns a palloc'd query string for the data type check, for use by + * check_for_data_types_usage()'s UpgradeTask. + */ +static char * +data_type_check_query(int checknum) +{ + DataTypesUsageChecks *check = &data_types_usage_checks[checknum]; + + return psprintf("WITH RECURSIVE oids AS ( " + /* start with the type(s) returned by base_query */ + " %s " + " UNION ALL " + " SELECT * FROM ( " + /* inner WITH because we can only reference the CTE once */ + " WITH x AS (SELECT oid FROM oids) " + /* domains on any type selected so far */ + " SELECT t.oid FROM pg_catalog.pg_type t, x WHERE typbasetype = x.oid AND typtype = 'd' " + " UNION ALL " + /* arrays over any type selected so far */ + " SELECT t.oid FROM pg_catalog.pg_type t, x WHERE typelem = x.oid AND typtype = 'b' " + " UNION ALL " + /* composite types containing any type selected so far */ + " SELECT t.oid FROM pg_catalog.pg_type t, pg_catalog.pg_class c, pg_catalog.pg_attribute a, x " + " WHERE t.typtype = 'c' AND " + " t.oid = c.reltype AND " + " c.oid = a.attrelid AND " + " NOT a.attisdropped AND " + " a.atttypid = x.oid " + " UNION ALL " + /* ranges containing any type selected so far */ + " SELECT t.oid FROM pg_catalog.pg_type t, pg_catalog.pg_range r, x " + " WHERE t.typtype = 'r' AND r.rngtypid = t.oid AND r.rngsubtype = x.oid" + " ) foo " + ") " + /* now look for stored columns of any such type */ + "SELECT n.nspname, c.relname, a.attname " + "FROM pg_catalog.pg_class c, " + " pg_catalog.pg_namespace n, " + " pg_catalog.pg_attribute a " + "WHERE c.oid = a.attrelid AND " + " NOT a.attisdropped AND " + " a.atttypid IN (SELECT oid FROM oids) AND " + " c.relkind IN (" + CppAsString2(RELKIND_RELATION) ", " + CppAsString2(RELKIND_MATVIEW) ", " + CppAsString2(RELKIND_INDEX) ") AND " + " c.relnamespace = n.oid AND " + /* exclude possible orphaned temp tables */ + " n.nspname !~ '^pg_temp_' AND " + " n.nspname !~ '^pg_toast_temp_' AND " + /* exclude system catalogs, too */ + " n.nspname NOT IN ('pg_catalog', 'information_schema')", + check->base_query); +} + +/* + * Callback function for processing results of queries for + * check_for_data_types_usage()'s UpgradeTask. If the query returned any rows + * (i.e., the check failed), write the details to the report file. + */ +static void +process_data_type_check(DbInfo *dbinfo, PGresult *res, void *arg) +{ + struct data_type_check_state *state = (struct data_type_check_state *) arg; + int ntups = PQntuples(res); + + AssertVariableIsOfType(&process_data_type_check, UpgradeTaskProcessCB); + + if (ntups) + { + char output_path[MAXPGPATH]; + int i_nspname; + int i_relname; + int i_attname; + FILE *script = NULL; + bool db_used = false; + + snprintf(output_path, sizeof(output_path), "%s/%s", + log_opts.basedir, + state->check->report_filename); + + /* + * Make sure we have a buffer to save reports to now that we found a + * first failing check. + */ + if (*state->report == NULL) + *state->report = createPQExpBuffer(); + + /* + * If this is the first time we see an error for the check in question + * then print a status message of the failure. + */ + if (!(*state->result)) + { + pg_log(PG_REPORT, " failed check: %s", _(state->check->status)); + appendPQExpBuffer(*state->report, "\n%s\n%s %s\n", + _(state->check->report_text), + _("A list of the problem columns is in the file:"), + output_path); + } + *state->result = true; + + i_nspname = PQfnumber(res, "nspname"); + i_relname = PQfnumber(res, "relname"); + i_attname = PQfnumber(res, "attname"); + + for (int rowno = 0; rowno < ntups; rowno++) + { + if (script == NULL && (script = fopen_priv(output_path, "a")) == NULL) + pg_fatal("could not open file \"%s\": %m", output_path); + + if (!db_used) + { + fprintf(script, "In database: %s\n", dbinfo->db_name); + db_used = true; + } + fprintf(script, " %s.%s.%s\n", + PQgetvalue(res, rowno, i_nspname), + PQgetvalue(res, rowno, i_relname), + PQgetvalue(res, rowno, i_attname)); + } + + if (script) + { + fclose(script); + script = NULL; + } + } +} + /* * check_for_data_types_usage() * Detect whether there are any stored columns depending on given type(s) @@ -334,13 +475,15 @@ static DataTypesUsageChecks data_types_usage_checks[] = * there's no storage involved in a view. */ static void -check_for_data_types_usage(ClusterInfo *cluster, DataTypesUsageChecks *checks) +check_for_data_types_usage(ClusterInfo *cluster) { - bool found = false; bool *results; - PQExpBufferData report; - DataTypesUsageChecks *tmp = checks; + PQExpBuffer report = NULL; + DataTypesUsageChecks *tmp = data_types_usage_checks; int n_data_types_usage_checks = 0; + UpgradeTask *task = upgrade_task_create(); + char **queries = NULL; + struct data_type_check_state *states; prep_status("Checking data type usage"); @@ -353,175 +496,63 @@ check_for_data_types_usage(ClusterInfo *cluster, DataTypesUsageChecks *checks) /* Prepare an array to store the results of checks in */ results = pg_malloc0(sizeof(bool) * n_data_types_usage_checks); + queries = pg_malloc0(sizeof(char *) * n_data_types_usage_checks); + states = pg_malloc0(sizeof(struct data_type_check_state) * n_data_types_usage_checks); - /* - * Connect to each database in the cluster and run all defined checks - * against that database before trying the next one. - */ - for (int dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++) + for (int i = 0; i < n_data_types_usage_checks; i++) { - DbInfo *active_db = &cluster->dbarr.dbs[dbnum]; - PGconn *conn = connectToServer(cluster, active_db->db_name); + DataTypesUsageChecks *check = &data_types_usage_checks[i]; - for (int checknum = 0; checknum < n_data_types_usage_checks; checknum++) + if (check->threshold_version == MANUAL_CHECK) { - PGresult *res; - int ntups; - int i_nspname; - int i_relname; - int i_attname; - FILE *script = NULL; - bool db_used = false; - char output_path[MAXPGPATH]; - DataTypesUsageChecks *cur_check = &checks[checknum]; - - if (cur_check->threshold_version == MANUAL_CHECK) - { - Assert(cur_check->version_hook); - - /* - * Make sure that the check applies to the current cluster - * version and skip if not. If no check hook has been defined - * we run the check for all versions. - */ - if (!cur_check->version_hook(cluster)) - continue; - } - else if (cur_check->threshold_version != ALL_VERSIONS) - { - if (GET_MAJOR_VERSION(cluster->major_version) > cur_check->threshold_version) - continue; - } - else - Assert(cur_check->threshold_version == ALL_VERSIONS); - - snprintf(output_path, sizeof(output_path), "%s/%s", - log_opts.basedir, - cur_check->report_filename); + Assert(check->version_hook); /* - * The type(s) of interest might be wrapped in a domain, array, - * composite, or range, and these container types can be nested - * (to varying extents depending on server version, but that's not - * of concern here). To handle all these cases we need a - * recursive CTE. + * Make sure that the check applies to the current cluster version + * and skip it if not. */ - res = executeQueryOrDie(conn, - "WITH RECURSIVE oids AS ( " - /* start with the type(s) returned by base_query */ - " %s " - " UNION ALL " - " SELECT * FROM ( " - /* inner WITH because we can only reference the CTE once */ - " WITH x AS (SELECT oid FROM oids) " - /* domains on any type selected so far */ - " SELECT t.oid FROM pg_catalog.pg_type t, x WHERE typbasetype = x.oid AND typtype = 'd' " - " UNION ALL " - /* arrays over any type selected so far */ - " SELECT t.oid FROM pg_catalog.pg_type t, x WHERE typelem = x.oid AND typtype = 'b' " - " UNION ALL " - /* composite types containing any type selected so far */ - " SELECT t.oid FROM pg_catalog.pg_type t, pg_catalog.pg_class c, pg_catalog.pg_attribute a, x " - " WHERE t.typtype = 'c' AND " - " t.oid = c.reltype AND " - " c.oid = a.attrelid AND " - " NOT a.attisdropped AND " - " a.atttypid = x.oid " - " UNION ALL " - /* ranges containing any type selected so far */ - " SELECT t.oid FROM pg_catalog.pg_type t, pg_catalog.pg_range r, x " - " WHERE t.typtype = 'r' AND r.rngtypid = t.oid AND r.rngsubtype = x.oid" - " ) foo " - ") " - /* now look for stored columns of any such type */ - "SELECT n.nspname, c.relname, a.attname " - "FROM pg_catalog.pg_class c, " - " pg_catalog.pg_namespace n, " - " pg_catalog.pg_attribute a " - "WHERE c.oid = a.attrelid AND " - " NOT a.attisdropped AND " - " a.atttypid IN (SELECT oid FROM oids) AND " - " c.relkind IN (" - CppAsString2(RELKIND_RELATION) ", " - CppAsString2(RELKIND_MATVIEW) ", " - CppAsString2(RELKIND_INDEX) ") AND " - " c.relnamespace = n.oid AND " - /* exclude possible orphaned temp tables */ - " n.nspname !~ '^pg_temp_' AND " - " n.nspname !~ '^pg_toast_temp_' AND " - /* exclude system catalogs, too */ - " n.nspname NOT IN ('pg_catalog', 'information_schema')", - cur_check->base_query); - - ntups = PQntuples(res); + if (!check->version_hook(cluster)) + continue; + } + else if (check->threshold_version != ALL_VERSIONS) + { + if (GET_MAJOR_VERSION(cluster->major_version) > check->threshold_version) + continue; + } + else + Assert(check->threshold_version == ALL_VERSIONS); - /* - * The datatype was found, so extract the data and log to the - * requested filename. We need to open the file for appending - * since the check might have already found the type in another - * database earlier in the loop. - */ - if (ntups) - { - /* - * Make sure we have a buffer to save reports to now that we - * found a first failing check. - */ - if (!found) - initPQExpBuffer(&report); - found = true; - - /* - * If this is the first time we see an error for the check in - * question then print a status message of the failure. - */ - if (!results[checknum]) - { - pg_log(PG_REPORT, " failed check: %s", _(cur_check->status)); - appendPQExpBuffer(&report, "\n%s\n%s %s\n", - _(cur_check->report_text), - _("A list of the problem columns is in the file:"), - output_path); - } - results[checknum] = true; - - i_nspname = PQfnumber(res, "nspname"); - i_relname = PQfnumber(res, "relname"); - i_attname = PQfnumber(res, "attname"); - - for (int rowno = 0; rowno < ntups; rowno++) - { - if (script == NULL && (script = fopen_priv(output_path, "a")) == NULL) - pg_fatal("could not open file \"%s\": %m", output_path); - - if (!db_used) - { - fprintf(script, "In database: %s\n", active_db->db_name); - db_used = true; - } - fprintf(script, " %s.%s.%s\n", - PQgetvalue(res, rowno, i_nspname), - PQgetvalue(res, rowno, i_relname), - PQgetvalue(res, rowno, i_attname)); - } - - if (script) - { - fclose(script); - script = NULL; - } - } + queries[i] = data_type_check_query(i); - PQclear(res); - } + states[i].check = check; + states[i].result = &results[i]; + states[i].report = &report; - PQfinish(conn); + upgrade_task_add_step(task, queries[i], process_data_type_check, + true, &states[i]); } - if (found) - pg_fatal("Data type checks failed: %s", report.data); + /* + * Connect to each database in the cluster and run all defined checks + * against that database before trying the next one. + */ + upgrade_task_run(task, cluster); + upgrade_task_free(task); + + if (report) + { + pg_fatal("Data type checks failed: %s", report->data); + destroyPQExpBuffer(report); + } pg_free(results); + for (int i = 0; i < n_data_types_usage_checks; i++) + { + if (queries[i]) + pg_free(queries[i]); + } + pg_free(queries); + pg_free(states); check_ok(); } @@ -616,7 +647,7 @@ check_and_dump_old_cluster(void) check_old_cluster_subscription_state(); } - check_for_data_types_usage(&old_cluster, data_types_usage_checks); + check_for_data_types_usage(&old_cluster); /* * PG 14 changed the function signature of encoding conversion functions. -- 2.39.3 (Apple Git-146)
>From dd239126aa373da9ab86f5cd8b9ec90d4e8fb64a Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nat...@postgresql.org> Date: Mon, 8 Jul 2024 21:00:20 -0500 Subject: [PATCH v10 07/11] Use pg_upgrade's new parallel framework for isn and int8 check. Reviewed-by: FIXME Discussion: https://postgr.es/m/20240516211638.GA1688936%40nathanxps13 --- src/bin/pg_upgrade/check.c | 97 ++++++++++++++++++++------------------ 1 file changed, 50 insertions(+), 47 deletions(-) diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c index f935b53e1f..b8af7e541b 100644 --- a/src/bin/pg_upgrade/check.c +++ b/src/bin/pg_upgrade/check.c @@ -1225,6 +1225,39 @@ check_for_prepared_transactions(ClusterInfo *cluster) check_ok(); } +/* + * Callback function for processing result of query for + * check_for_isn_and_int8_passing_mismatch()'s UpgradeTask. If the query + * returned any rows (i.e., the check failed), write the details to the report + * file. + */ +static void +process_isn_and_int8_passing_mismatch(DbInfo *dbinfo, PGresult *res, void *arg) +{ + bool db_used = false; + int ntups = PQntuples(res); + int i_nspname = PQfnumber(res, "nspname"); + int i_proname = PQfnumber(res, "proname"); + UpgradeTaskReport *report = (UpgradeTaskReport *) arg; + + AssertVariableIsOfType(&process_isn_and_int8_passing_mismatch, + UpgradeTaskProcessCB); + + for (int rowno = 0; rowno < ntups; rowno++) + { + if (report->file == NULL && + (report->file = fopen_priv(report->path, "w")) == NULL) + pg_fatal("could not open file \"%s\": %m", report->path); + if (!db_used) + { + fprintf(report->file, "In database: %s\n", dbinfo->db_name); + db_used = true; + } + fprintf(report->file, " %s.%s\n", + PQgetvalue(res, rowno, i_nspname), + PQgetvalue(res, rowno, i_proname)); + } +} /* * check_for_isn_and_int8_passing_mismatch() @@ -1236,9 +1269,13 @@ check_for_prepared_transactions(ClusterInfo *cluster) static void check_for_isn_and_int8_passing_mismatch(ClusterInfo *cluster) { - int dbnum; - FILE *script = NULL; - char output_path[MAXPGPATH]; + UpgradeTask *task; + UpgradeTaskReport report; + const char *query = "SELECT n.nspname, p.proname " + "FROM pg_catalog.pg_proc p, " + " pg_catalog.pg_namespace n " + "WHERE p.pronamespace = n.oid AND " + " p.probin = '$libdir/isn'"; prep_status("Checking for contrib/isn with bigint-passing mismatch"); @@ -1250,54 +1287,20 @@ check_for_isn_and_int8_passing_mismatch(ClusterInfo *cluster) return; } - snprintf(output_path, sizeof(output_path), "%s/%s", + report.file = NULL; + snprintf(report.path, sizeof(report.path), "%s/%s", log_opts.basedir, "contrib_isn_and_int8_pass_by_value.txt"); - for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++) - { - PGresult *res; - bool db_used = false; - int ntups; - int rowno; - int i_nspname, - i_proname; - DbInfo *active_db = &cluster->dbarr.dbs[dbnum]; - PGconn *conn = connectToServer(cluster, active_db->db_name); - - /* Find any functions coming from contrib/isn */ - res = executeQueryOrDie(conn, - "SELECT n.nspname, p.proname " - "FROM pg_catalog.pg_proc p, " - " pg_catalog.pg_namespace n " - "WHERE p.pronamespace = n.oid AND " - " p.probin = '$libdir/isn'"); - - ntups = PQntuples(res); - i_nspname = PQfnumber(res, "nspname"); - i_proname = PQfnumber(res, "proname"); - for (rowno = 0; rowno < ntups; rowno++) - { - if (script == NULL && (script = fopen_priv(output_path, "w")) == NULL) - pg_fatal("could not open file \"%s\": %m", output_path); - if (!db_used) - { - fprintf(script, "In database: %s\n", active_db->db_name); - db_used = true; - } - fprintf(script, " %s.%s\n", - PQgetvalue(res, rowno, i_nspname), - PQgetvalue(res, rowno, i_proname)); - } - - PQclear(res); - - PQfinish(conn); - } + task = upgrade_task_create(); + upgrade_task_add_step(task, query, process_isn_and_int8_passing_mismatch, + true, &report); + upgrade_task_run(task, cluster); + upgrade_task_free(task); - if (script) + if (report.file) { - fclose(script); + fclose(report.file); pg_log(PG_REPORT, "fatal"); pg_fatal("Your installation contains \"contrib/isn\" functions which rely on the\n" "bigint data type. Your old and new clusters pass bigint values\n" @@ -1305,7 +1308,7 @@ check_for_isn_and_int8_passing_mismatch(ClusterInfo *cluster) "manually dump databases in the old cluster that use \"contrib/isn\"\n" "facilities, drop them, perform the upgrade, and then restore them. A\n" "list of the problem functions is in the file:\n" - " %s", output_path); + " %s", report.path); } else check_ok(); -- 2.39.3 (Apple Git-146)
>From ac3190b807bdac244d448f9abc7cc79782349d26 Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nat...@postgresql.org> Date: Wed, 28 Aug 2024 15:10:19 -0500 Subject: [PATCH v10 08/11] Use pg_upgrade's new parallel framework for postfix operator check. Reviewed-by: FIXME Discussion: https://postgr.es/m/20240516211638.GA1688936%40nathanxps13 --- src/bin/pg_upgrade/check.c | 146 +++++++++++++++++++------------------ 1 file changed, 75 insertions(+), 71 deletions(-) diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c index b8af7e541b..28c4ddbca3 100644 --- a/src/bin/pg_upgrade/check.c +++ b/src/bin/pg_upgrade/check.c @@ -1315,95 +1315,99 @@ check_for_isn_and_int8_passing_mismatch(ClusterInfo *cluster) } /* - * Verify that no user defined postfix operators exist. + * Callback function for processing result of query for + * check_for_user_defined_postfix_ops()'s UpgradeTask. If the query returned + * any rows (i.e., the check failed), write the details to the report file. */ static void -check_for_user_defined_postfix_ops(ClusterInfo *cluster) +process_user_defined_postfix_ops(DbInfo *dbinfo, PGresult *res, void *arg) { - int dbnum; - FILE *script = NULL; - char output_path[MAXPGPATH]; + UpgradeTaskReport *report = (UpgradeTaskReport *) arg; + int ntups = PQntuples(res); + bool db_used = false; + int i_oproid = PQfnumber(res, "oproid"); + int i_oprnsp = PQfnumber(res, "oprnsp"); + int i_oprname = PQfnumber(res, "oprname"); + int i_typnsp = PQfnumber(res, "typnsp"); + int i_typname = PQfnumber(res, "typname"); - prep_status("Checking for user-defined postfix operators"); + AssertVariableIsOfType(&process_user_defined_postfix_ops, + UpgradeTaskProcessCB); - snprintf(output_path, sizeof(output_path), "%s/%s", - log_opts.basedir, - "postfix_ops.txt"); + if (!ntups) + return; - /* Find any user defined postfix operators */ - for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++) + for (int rowno = 0; rowno < ntups; rowno++) { - PGresult *res; - bool db_used = false; - int ntups; - int rowno; - int i_oproid, - i_oprnsp, - i_oprname, - i_typnsp, - i_typname; - DbInfo *active_db = &cluster->dbarr.dbs[dbnum]; - PGconn *conn = connectToServer(cluster, active_db->db_name); - - /* - * The query below hardcodes FirstNormalObjectId as 16384 rather than - * interpolating that C #define into the query because, if that - * #define is ever changed, the cutoff we want to use is the value - * used by pre-version 14 servers, not that of some future version. - */ - res = executeQueryOrDie(conn, - "SELECT o.oid AS oproid, " - " n.nspname AS oprnsp, " - " o.oprname, " - " tn.nspname AS typnsp, " - " t.typname " - "FROM pg_catalog.pg_operator o, " - " pg_catalog.pg_namespace n, " - " pg_catalog.pg_type t, " - " pg_catalog.pg_namespace tn " - "WHERE o.oprnamespace = n.oid AND " - " o.oprleft = t.oid AND " - " t.typnamespace = tn.oid AND " - " o.oprright = 0 AND " - " o.oid >= 16384"); - ntups = PQntuples(res); - i_oproid = PQfnumber(res, "oproid"); - i_oprnsp = PQfnumber(res, "oprnsp"); - i_oprname = PQfnumber(res, "oprname"); - i_typnsp = PQfnumber(res, "typnsp"); - i_typname = PQfnumber(res, "typname"); - for (rowno = 0; rowno < ntups; rowno++) + if (report->file == NULL && + (report->file = fopen_priv(report->path, "w")) == NULL) + pg_fatal("could not open file \"%s\": %m", report->path); + if (!db_used) { - if (script == NULL && - (script = fopen_priv(output_path, "w")) == NULL) - pg_fatal("could not open file \"%s\": %m", output_path); - if (!db_used) - { - fprintf(script, "In database: %s\n", active_db->db_name); - db_used = true; - } - fprintf(script, " (oid=%s) %s.%s (%s.%s, NONE)\n", - PQgetvalue(res, rowno, i_oproid), - PQgetvalue(res, rowno, i_oprnsp), - PQgetvalue(res, rowno, i_oprname), - PQgetvalue(res, rowno, i_typnsp), - PQgetvalue(res, rowno, i_typname)); + fprintf(report->file, "In database: %s\n", dbinfo->db_name); + db_used = true; } + fprintf(report->file, " (oid=%s) %s.%s (%s.%s, NONE)\n", + PQgetvalue(res, rowno, i_oproid), + PQgetvalue(res, rowno, i_oprnsp), + PQgetvalue(res, rowno, i_oprname), + PQgetvalue(res, rowno, i_typnsp), + PQgetvalue(res, rowno, i_typname)); + } +} - PQclear(res); +/* + * Verify that no user defined postfix operators exist. + */ +static void +check_for_user_defined_postfix_ops(ClusterInfo *cluster) +{ + UpgradeTaskReport report; + UpgradeTask *task = upgrade_task_create(); + const char *query; - PQfinish(conn); - } + /* + * The query below hardcodes FirstNormalObjectId as 16384 rather than + * interpolating that C #define into the query because, if that #define is + * ever changed, the cutoff we want to use is the value used by + * pre-version 14 servers, not that of some future version. + */ + query = "SELECT o.oid AS oproid, " + " n.nspname AS oprnsp, " + " o.oprname, " + " tn.nspname AS typnsp, " + " t.typname " + "FROM pg_catalog.pg_operator o, " + " pg_catalog.pg_namespace n, " + " pg_catalog.pg_type t, " + " pg_catalog.pg_namespace tn " + "WHERE o.oprnamespace = n.oid AND " + " o.oprleft = t.oid AND " + " t.typnamespace = tn.oid AND " + " o.oprright = 0 AND " + " o.oid >= 16384"; - if (script) + prep_status("Checking for user-defined postfix operators"); + + report.file = NULL; + snprintf(report.path, sizeof(report.path), "%s/%s", + log_opts.basedir, + "postfix_ops.txt"); + + upgrade_task_add_step(task, query, process_user_defined_postfix_ops, + true, &report); + upgrade_task_run(task, cluster); + upgrade_task_free(task); + + if (report.file) { - fclose(script); + fclose(report.file); pg_log(PG_REPORT, "fatal"); pg_fatal("Your installation contains user-defined postfix operators, which are not\n" "supported anymore. Consider dropping the postfix operators and replacing\n" "them with prefix operators or function calls.\n" "A list of user-defined postfix operators is in the file:\n" - " %s", output_path); + " %s", report.path); } else check_ok(); -- 2.39.3 (Apple Git-146)
>From d756f173cf94128c6a4fe11b9a48b90d33631298 Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nat...@postgresql.org> Date: Wed, 28 Aug 2024 15:21:29 -0500 Subject: [PATCH v10 09/11] Use pg_upgrade's new parallel framework for polymorphics check. Reviewed-by: FIXME Discussion: https://postgr.es/m/20240516211638.GA1688936%40nathanxps13 --- src/bin/pg_upgrade/check.c | 159 +++++++++++++++++++------------------ 1 file changed, 83 insertions(+), 76 deletions(-) diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c index 28c4ddbca3..92a3aa6a77 100644 --- a/src/bin/pg_upgrade/check.c +++ b/src/bin/pg_upgrade/check.c @@ -1413,6 +1413,40 @@ check_for_user_defined_postfix_ops(ClusterInfo *cluster) check_ok(); } +/* + * Callback function for processing results of query for + * check_for_incompatible_polymorphics()'s UpgradeTask. If the query returned + * any rows (i.e., the check failed), write the details to the report file. + */ +static void +process_incompat_polymorphics(DbInfo *dbinfo, PGresult *res, void *arg) +{ + UpgradeTaskReport *report = (UpgradeTaskReport *) arg; + bool db_used = false; + int ntups = PQntuples(res); + int i_objkind = PQfnumber(res, "objkind"); + int i_objname = PQfnumber(res, "objname"); + + AssertVariableIsOfType(&process_incompat_polymorphics, + UpgradeTaskProcessCB); + + for (int rowno = 0; rowno < ntups; rowno++) + { + if (report->file == NULL && + (report->file = fopen_priv(report->path, "w")) == NULL) + pg_fatal("could not open file \"%s\": %m", report->path); + if (!db_used) + { + fprintf(report->file, "In database: %s\n", dbinfo->db_name); + db_used = true; + } + + fprintf(report->file, " %s: %s\n", + PQgetvalue(res, rowno, i_objkind), + PQgetvalue(res, rowno, i_objname)); + } +} + /* * check_for_incompatible_polymorphics() * @@ -1422,14 +1456,15 @@ check_for_user_defined_postfix_ops(ClusterInfo *cluster) static void check_for_incompatible_polymorphics(ClusterInfo *cluster) { - PGresult *res; - FILE *script = NULL; - char output_path[MAXPGPATH]; PQExpBufferData old_polymorphics; + UpgradeTask *task = upgrade_task_create(); + UpgradeTaskReport report; + char *query; prep_status("Checking for incompatible polymorphic functions"); - snprintf(output_path, sizeof(output_path), "%s/%s", + report.file = NULL; + snprintf(report.path, sizeof(report.path), "%s/%s", log_opts.basedir, "incompatible_polymorphics.txt"); @@ -1453,80 +1488,51 @@ check_for_incompatible_polymorphics(ClusterInfo *cluster) ", 'array_positions(anyarray,anyelement)'" ", 'width_bucket(anyelement,anyarray)'"); - for (int dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++) - { - bool db_used = false; - DbInfo *active_db = &cluster->dbarr.dbs[dbnum]; - PGconn *conn = connectToServer(cluster, active_db->db_name); - int ntups; - int i_objkind, - i_objname; - - /* - * The query below hardcodes FirstNormalObjectId as 16384 rather than - * interpolating that C #define into the query because, if that - * #define is ever changed, the cutoff we want to use is the value - * used by pre-version 14 servers, not that of some future version. - */ - res = executeQueryOrDie(conn, - /* Aggregate transition functions */ - "SELECT 'aggregate' AS objkind, p.oid::regprocedure::text AS objname " - "FROM pg_proc AS p " - "JOIN pg_aggregate AS a ON a.aggfnoid=p.oid " - "JOIN pg_proc AS transfn ON transfn.oid=a.aggtransfn " - "WHERE p.oid >= 16384 " - "AND a.aggtransfn = ANY(ARRAY[%s]::regprocedure[]) " - "AND a.aggtranstype = ANY(ARRAY['anyarray', 'anyelement']::regtype[]) " - - /* Aggregate final functions */ - "UNION ALL " - "SELECT 'aggregate' AS objkind, p.oid::regprocedure::text AS objname " - "FROM pg_proc AS p " - "JOIN pg_aggregate AS a ON a.aggfnoid=p.oid " - "JOIN pg_proc AS finalfn ON finalfn.oid=a.aggfinalfn " - "WHERE p.oid >= 16384 " - "AND a.aggfinalfn = ANY(ARRAY[%s]::regprocedure[]) " - "AND a.aggtranstype = ANY(ARRAY['anyarray', 'anyelement']::regtype[]) " - - /* Operators */ - "UNION ALL " - "SELECT 'operator' AS objkind, op.oid::regoperator::text AS objname " - "FROM pg_operator AS op " - "WHERE op.oid >= 16384 " - "AND oprcode = ANY(ARRAY[%s]::regprocedure[]) " - "AND oprleft = ANY(ARRAY['anyarray', 'anyelement']::regtype[]);", - old_polymorphics.data, - old_polymorphics.data, - old_polymorphics.data); - - ntups = PQntuples(res); - - i_objkind = PQfnumber(res, "objkind"); - i_objname = PQfnumber(res, "objname"); - - for (int rowno = 0; rowno < ntups; rowno++) - { - if (script == NULL && - (script = fopen_priv(output_path, "w")) == NULL) - pg_fatal("could not open file \"%s\": %m", output_path); - if (!db_used) - { - fprintf(script, "In database: %s\n", active_db->db_name); - db_used = true; - } - - fprintf(script, " %s: %s\n", - PQgetvalue(res, rowno, i_objkind), - PQgetvalue(res, rowno, i_objname)); - } + /* + * The query below hardcodes FirstNormalObjectId as 16384 rather than + * interpolating that C #define into the query because, if that #define is + * ever changed, the cutoff we want to use is the value used by + * pre-version 14 servers, not that of some future version. + */ - PQclear(res); - PQfinish(conn); - } + /* Aggregate transition functions */ + query = psprintf("SELECT 'aggregate' AS objkind, p.oid::regprocedure::text AS objname " + "FROM pg_proc AS p " + "JOIN pg_aggregate AS a ON a.aggfnoid=p.oid " + "JOIN pg_proc AS transfn ON transfn.oid=a.aggtransfn " + "WHERE p.oid >= 16384 " + "AND a.aggtransfn = ANY(ARRAY[%s]::regprocedure[]) " + "AND a.aggtranstype = ANY(ARRAY['anyarray', 'anyelement']::regtype[]) " + + /* Aggregate final functions */ + "UNION ALL " + "SELECT 'aggregate' AS objkind, p.oid::regprocedure::text AS objname " + "FROM pg_proc AS p " + "JOIN pg_aggregate AS a ON a.aggfnoid=p.oid " + "JOIN pg_proc AS finalfn ON finalfn.oid=a.aggfinalfn " + "WHERE p.oid >= 16384 " + "AND a.aggfinalfn = ANY(ARRAY[%s]::regprocedure[]) " + "AND a.aggtranstype = ANY(ARRAY['anyarray', 'anyelement']::regtype[]) " + + /* Operators */ + "UNION ALL " + "SELECT 'operator' AS objkind, op.oid::regoperator::text AS objname " + "FROM pg_operator AS op " + "WHERE op.oid >= 16384 " + "AND oprcode = ANY(ARRAY[%s]::regprocedure[]) " + "AND oprleft = ANY(ARRAY['anyarray', 'anyelement']::regtype[]);", + old_polymorphics.data, + old_polymorphics.data, + old_polymorphics.data); + + upgrade_task_add_step(task, query, process_incompat_polymorphics, + true, &report); + upgrade_task_run(task, cluster); + upgrade_task_free(task); - if (script) + if (report.file) { - fclose(script); + fclose(report.file); pg_log(PG_REPORT, "fatal"); pg_fatal("Your installation contains user-defined objects that refer to internal\n" "polymorphic functions with arguments of type \"anyarray\" or \"anyelement\".\n" @@ -1534,12 +1540,13 @@ check_for_incompatible_polymorphics(ClusterInfo *cluster) "afterwards, changing them to refer to the new corresponding functions with\n" "arguments of type \"anycompatiblearray\" and \"anycompatible\".\n" "A list of the problematic objects is in the file:\n" - " %s", output_path); + " %s", report.path); } else check_ok(); termPQExpBuffer(&old_polymorphics); + pg_free(query); } /* -- 2.39.3 (Apple Git-146)
>From a6a7710e22d8d879a389de5777f724b45ed2e6f2 Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nat...@postgresql.org> Date: Wed, 28 Aug 2024 15:27:37 -0500 Subject: [PATCH v10 10/11] Use pg_upgrade's new parallel framework for WITH OIDS check. Reviewed-by: FIXME Discussion: https://postgr.es/m/20240516211638.GA1688936%40nathanxps13 --- src/bin/pg_upgrade/check.c | 100 +++++++++++++++++++------------------ 1 file changed, 52 insertions(+), 48 deletions(-) diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c index 92a3aa6a77..dff440b29a 100644 --- a/src/bin/pg_upgrade/check.c +++ b/src/bin/pg_upgrade/check.c @@ -1550,72 +1550,76 @@ check_for_incompatible_polymorphics(ClusterInfo *cluster) } /* - * Verify that no tables are declared WITH OIDS. + * Callback function for processing results of query for + * check_for_tables_with_oids()'s UpgradeTask. If the query returned any rows + * (i.e., the check failed), write the details to the report file. */ static void -check_for_tables_with_oids(ClusterInfo *cluster) +process_with_oids_check(DbInfo *dbinfo, PGresult *res, void *arg) { - int dbnum; - FILE *script = NULL; - char output_path[MAXPGPATH]; + UpgradeTaskReport *report = (UpgradeTaskReport *) arg; + bool db_used = false; + int ntups = PQntuples(res); + int i_nspname = PQfnumber(res, "nspname"); + int i_relname = PQfnumber(res, "relname"); - prep_status("Checking for tables WITH OIDS"); + AssertVariableIsOfType(&process_with_oids_check, UpgradeTaskProcessCB); - snprintf(output_path, sizeof(output_path), "%s/%s", - log_opts.basedir, - "tables_with_oids.txt"); + if (!ntups) + return; - /* Find any tables declared WITH OIDS */ - for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++) + for (int rowno = 0; rowno < ntups; rowno++) { - PGresult *res; - bool db_used = false; - int ntups; - int rowno; - int i_nspname, - i_relname; - DbInfo *active_db = &cluster->dbarr.dbs[dbnum]; - PGconn *conn = connectToServer(cluster, active_db->db_name); - - res = executeQueryOrDie(conn, - "SELECT n.nspname, c.relname " - "FROM pg_catalog.pg_class c, " - " pg_catalog.pg_namespace n " - "WHERE c.relnamespace = n.oid AND " - " c.relhasoids AND" - " n.nspname NOT IN ('pg_catalog')"); - - ntups = PQntuples(res); - i_nspname = PQfnumber(res, "nspname"); - i_relname = PQfnumber(res, "relname"); - for (rowno = 0; rowno < ntups; rowno++) + if (report->file == NULL && + (report->file = fopen_priv(report->path, "w")) == NULL) + pg_fatal("could not open file \"%s\": %m", report->path); + if (!db_used) { - if (script == NULL && (script = fopen_priv(output_path, "w")) == NULL) - pg_fatal("could not open file \"%s\": %m", output_path); - if (!db_used) - { - fprintf(script, "In database: %s\n", active_db->db_name); - db_used = true; - } - fprintf(script, " %s.%s\n", - PQgetvalue(res, rowno, i_nspname), - PQgetvalue(res, rowno, i_relname)); + fprintf(report->file, "In database: %s\n", dbinfo->db_name); + db_used = true; } + fprintf(report->file, " %s.%s\n", + PQgetvalue(res, rowno, i_nspname), + PQgetvalue(res, rowno, i_relname)); + } +} - PQclear(res); +/* + * Verify that no tables are declared WITH OIDS. + */ +static void +check_for_tables_with_oids(ClusterInfo *cluster) +{ + UpgradeTaskReport report; + UpgradeTask *task = upgrade_task_create(); + const char *query = "SELECT n.nspname, c.relname " + "FROM pg_catalog.pg_class c, " + " pg_catalog.pg_namespace n " + "WHERE c.relnamespace = n.oid AND " + " c.relhasoids AND" + " n.nspname NOT IN ('pg_catalog')"; - PQfinish(conn); - } + prep_status("Checking for tables WITH OIDS"); - if (script) + report.file = NULL; + snprintf(report.path, sizeof(report.path), "%s/%s", + log_opts.basedir, + "tables_with_oids.txt"); + + upgrade_task_add_step(task, query, process_with_oids_check, + true, &report); + upgrade_task_run(task, cluster); + upgrade_task_free(task); + + if (report.file) { - fclose(script); + fclose(report.file); pg_log(PG_REPORT, "fatal"); pg_fatal("Your installation contains tables declared WITH OIDS, which is not\n" "supported anymore. Consider removing the oid column using\n" " ALTER TABLE ... SET WITHOUT OIDS;\n" "A list of tables with the problem is in the file:\n" - " %s", output_path); + " %s", report.path); } else check_ok(); -- 2.39.3 (Apple Git-146)
>From 3cd66a2e1bbe3fc7a90c96962c158475f715245d Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nat...@postgresql.org> Date: Wed, 28 Aug 2024 15:35:31 -0500 Subject: [PATCH v10 11/11] Use pg_upgrade's parallel framework for encoding conversion check. Reviewed-by: FIXME Discussion: https://postgr.es/m/20240516211638.GA1688936%40nathanxps13 --- src/bin/pg_upgrade/check.c | 120 ++++++++++++++++++++----------------- 1 file changed, 64 insertions(+), 56 deletions(-) diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c index dff440b29a..01ab3d0694 100644 --- a/src/bin/pg_upgrade/check.c +++ b/src/bin/pg_upgrade/check.c @@ -1684,81 +1684,89 @@ check_for_pg_role_prefix(ClusterInfo *cluster) } /* - * Verify that no user-defined encoding conversions exist. + * Callback function for processing results of query for + * check_for_user_defined_encoding_conversions()'s UpgradeTask. If the query + * returned any rows (i.e., the check failed), write the details to the report + * file. */ static void -check_for_user_defined_encoding_conversions(ClusterInfo *cluster) +process_user_defined_encoding_conversions(DbInfo *dbinfo, PGresult *res, void *arg) { - int dbnum; - FILE *script = NULL; - char output_path[MAXPGPATH]; + UpgradeTaskReport *report = (UpgradeTaskReport *) arg; + bool db_used = false; + int ntups = PQntuples(res); + int i_conoid = PQfnumber(res, "conoid"); + int i_conname = PQfnumber(res, "conname"); + int i_nspname = PQfnumber(res, "nspname"); - prep_status("Checking for user-defined encoding conversions"); + AssertVariableIsOfType(&process_user_defined_encoding_conversions, + UpgradeTaskProcessCB); - snprintf(output_path, sizeof(output_path), "%s/%s", - log_opts.basedir, - "encoding_conversions.txt"); + if (!ntups) + return; - /* Find any user defined encoding conversions */ - for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++) + for (int rowno = 0; rowno < ntups; rowno++) { - PGresult *res; - bool db_used = false; - int ntups; - int rowno; - int i_conoid, - i_conname, - i_nspname; - DbInfo *active_db = &cluster->dbarr.dbs[dbnum]; - PGconn *conn = connectToServer(cluster, active_db->db_name); - - /* - * The query below hardcodes FirstNormalObjectId as 16384 rather than - * interpolating that C #define into the query because, if that - * #define is ever changed, the cutoff we want to use is the value - * used by pre-version 14 servers, not that of some future version. - */ - res = executeQueryOrDie(conn, - "SELECT c.oid as conoid, c.conname, n.nspname " - "FROM pg_catalog.pg_conversion c, " - " pg_catalog.pg_namespace n " - "WHERE c.connamespace = n.oid AND " - " c.oid >= 16384"); - ntups = PQntuples(res); - i_conoid = PQfnumber(res, "conoid"); - i_conname = PQfnumber(res, "conname"); - i_nspname = PQfnumber(res, "nspname"); - for (rowno = 0; rowno < ntups; rowno++) + if (report->file == NULL && + (report->file = fopen_priv(report->path, "w")) == NULL) + pg_fatal("could not open file \"%s\": %m", report->path); + if (!db_used) { - if (script == NULL && - (script = fopen_priv(output_path, "w")) == NULL) - pg_fatal("could not open file \"%s\": %m", output_path); - if (!db_used) - { - fprintf(script, "In database: %s\n", active_db->db_name); - db_used = true; - } - fprintf(script, " (oid=%s) %s.%s\n", - PQgetvalue(res, rowno, i_conoid), - PQgetvalue(res, rowno, i_nspname), - PQgetvalue(res, rowno, i_conname)); + fprintf(report->file, "In database: %s\n", dbinfo->db_name); + db_used = true; } + fprintf(report->file, " (oid=%s) %s.%s\n", + PQgetvalue(res, rowno, i_conoid), + PQgetvalue(res, rowno, i_nspname), + PQgetvalue(res, rowno, i_conname)); + } +} - PQclear(res); +/* + * Verify that no user-defined encoding conversions exist. + */ +static void +check_for_user_defined_encoding_conversions(ClusterInfo *cluster) +{ + UpgradeTaskReport report; + UpgradeTask *task = upgrade_task_create(); + const char *query; - PQfinish(conn); - } + prep_status("Checking for user-defined encoding conversions"); - if (script) + report.file = NULL; + snprintf(report.path, sizeof(report.path), "%s/%s", + log_opts.basedir, + "encoding_conversions.txt"); + + /* + * The query below hardcodes FirstNormalObjectId as 16384 rather than + * interpolating that C #define into the query because, if that #define is + * ever changed, the cutoff we want to use is the value used by + * pre-version 14 servers, not that of some future version. + */ + query = "SELECT c.oid as conoid, c.conname, n.nspname " + "FROM pg_catalog.pg_conversion c, " + " pg_catalog.pg_namespace n " + "WHERE c.connamespace = n.oid AND " + " c.oid >= 16384"; + + upgrade_task_add_step(task, query, + process_user_defined_encoding_conversions, + true, &report); + upgrade_task_run(task, cluster); + upgrade_task_free(task); + + if (report.file) { - fclose(script); + fclose(report.file); pg_log(PG_REPORT, "fatal"); pg_fatal("Your installation contains user-defined encoding conversions.\n" "The conversion function parameters changed in PostgreSQL version 14\n" "so this cluster cannot currently be upgraded. You can remove the\n" "encoding conversions in the old cluster and restart the upgrade.\n" "A list of user-defined encoding conversions is in the file:\n" - " %s", output_path); + " %s", report.path); } else check_ok(); -- 2.39.3 (Apple Git-146)