On Mon, Jul 22, 2019 at 02:40:19PM +0200, Julien Rouhaud wrote:
> Right, so I kept the long option.  Also this comment was outdated, as
> the --jobs is now just ignored with a list of indexes, so I fixed that
> too.

+   if (!parallel)
+   {
+       if (user_list == NULL)
+       {
+           /*
+            * Create a dummy list with an empty string, as user requires an
+            * element.
+            */
+           process_list = pg_malloc0(sizeof(SimpleStringList));
+           simple_string_list_append(process_list, "");
+       }
+   }
This part looks a bit hacky and this is needed because we don't have a
list of objects when doing a non-parallel system or database reindex.
The deal is that we just want a list with one element: the database of
the connection.  Wouldn't it be more natural to assign the database
name here using PQdb(conn)?  Then add an assertion at the top of
run_reindex_command() checking for a non-NULL name?

> I considered this, but it would require to adapt all code that declare
> SimpleStringList stack variable (vacuumdb.c, clusterdb.c,
> createuser.c, pg_dumpall.c and pg_dump.c), so it looked like too much
> trouble to avoid 2 local variables here and 1 in vacuumdb.c.  I don't
> have a strong opinion here, so I can go for it if you prefer.

Okay.  This is a tad wider than the original patch proposal, and this
adds two lines.  So let's discard that for now and keep it simple.

>> +$node->issues_sql_like([qw(reindexdb -j2)],
>> +   qr/statement: REINDEX TABLE public.test1/,
>> +   'Global and parallel reindex will issue per-table REINDEX');
>> Would it make sense to have some tests for schemas here?
>>
>> One of my comments in [1] has not been answered.  What about
>> the decomposition of a list of schemas into a list of tables when
>> using the parallel mode?
> 
> I did that in attached v6, and added suitable regression tests.

The two tests for "reindexdb -j2" can be combined into a single call,
checking for both commands to be generated in the same output.  The
second command triggering a reindex on two schemas cannot be used to
check for the generation of both s1.t1 and s2.t2 as the ordering may
not be guaranteed.  The commands arrays also looked inconsistent with
the rest.  Attached is an updated patch with some format modifications
and the fixes for the tests.
--
Michael
diff --git a/doc/src/sgml/ref/reindexdb.sgml b/doc/src/sgml/ref/reindexdb.sgml
index 25b5a72770..477b77c6e9 100644
--- a/doc/src/sgml/ref/reindexdb.sgml
+++ b/doc/src/sgml/ref/reindexdb.sgml
@@ -166,6 +166,31 @@ PostgreSQL documentation
       </listitem>
      </varlistentry>
 
+     <varlistentry>
+      <term><option>-j <replaceable class="parameter">njobs</replaceable></option></term>
+      <term><option>--jobs=<replaceable class="parameter">njobs</replaceable></option></term>
+      <listitem>
+       <para>
+        Execute the reindex commands in parallel by running
+        <replaceable class="parameter">njobs</replaceable>
+        commands simultaneously.  This option reduces the time of the
+        processing but it also increases the load on the database server.
+       </para>
+       <para>
+        <application>reindexdb</application> will open
+        <replaceable class="parameter">njobs</replaceable> connections to the
+        database, so make sure your <xref linkend="guc-max-connections"/>
+        setting is high enough to accommodate all connections.
+       </para>
+       <para>
+        Note that this option is ignored with the <option>--index</option>
+        option to prevent deadlocks when processing multiple indexes from
+        the same relation, and incompatible with the <option>--system</option>
+        option.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry>
       <term><option>-q</option></term>
       <term><option>--quiet</option></term>
diff --git a/src/bin/scripts/Makefile b/src/bin/scripts/Makefile
index 3cd793b134..ede665090f 100644
--- a/src/bin/scripts/Makefile
+++ b/src/bin/scripts/Makefile
@@ -29,7 +29,7 @@ dropdb: dropdb.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-
 dropuser: dropuser.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils
 clusterdb: clusterdb.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils
 vacuumdb: vacuumdb.o common.o scripts_parallel.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils
-reindexdb: reindexdb.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils
+reindexdb: reindexdb.o common.o scripts_parallel.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils
 pg_isready: pg_isready.o common.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils
 
 install: all installdirs
diff --git a/src/bin/scripts/reindexdb.c b/src/bin/scripts/reindexdb.c
index 219a9a9211..895dc03737 100644
--- a/src/bin/scripts/reindexdb.c
+++ b/src/bin/scripts/reindexdb.c
@@ -10,10 +10,14 @@
  */
 
 #include "postgres_fe.h"
+
+#include "catalog/pg_class_d.h"
 #include "common.h"
 #include "common/logging.h"
+#include "fe_utils/connect.h"
 #include "fe_utils/simple_list.h"
 #include "fe_utils/string_utils.h"
+#include "scripts_parallel.h"
 
 typedef enum ReindexType
 {
@@ -25,16 +29,26 @@ typedef enum ReindexType
 } ReindexType;
 
 
-static void reindex_one_database(const char *name, const char *dbname,
-								 ReindexType type, const char *host,
+static SimpleStringList *get_parallel_object_list(PGconn *conn,
+												  ReindexType type,
+												  SimpleStringList *user_list,
+												  bool echo);
+static void reindex_one_database(const char *dbname, ReindexType type,
+								 SimpleStringList *user_list, const char *host,
 								 const char *port, const char *username,
 								 enum trivalue prompt_password, const char *progname,
-								 bool echo, bool verbose, bool concurrently);
+								 bool echo, bool verbose, bool concurrently,
+								 int concurrentCons);
 static void reindex_all_databases(const char *maintenance_db,
 								  const char *host, const char *port,
 								  const char *username, enum trivalue prompt_password,
 								  const char *progname, bool echo,
-								  bool quiet, bool verbose, bool concurrently);
+								  bool quiet, bool verbose, bool concurrently,
+								  int concurrentCons);
+static void run_reindex_command(PGconn *conn, ReindexType type,
+								const char *name, bool echo, bool verbose,
+								bool concurrently, bool async);
+
 static void help(const char *progname);
 
 int
@@ -54,6 +68,7 @@ main(int argc, char *argv[])
 		{"system", no_argument, NULL, 's'},
 		{"table", required_argument, NULL, 't'},
 		{"index", required_argument, NULL, 'i'},
+		{"jobs", required_argument, NULL, 'j'},
 		{"verbose", no_argument, NULL, 'v'},
 		{"concurrently", no_argument, NULL, 1},
 		{"maintenance-db", required_argument, NULL, 2},
@@ -79,6 +94,9 @@ main(int argc, char *argv[])
 	SimpleStringList indexes = {NULL, NULL};
 	SimpleStringList tables = {NULL, NULL};
 	SimpleStringList schemas = {NULL, NULL};
+	int			concurrentCons = 1;
+	int			tbl_count = 0,
+				nsp_count = 0;
 
 	pg_logging_init(argv[0]);
 	progname = get_progname(argv[0]);
@@ -87,7 +105,7 @@ main(int argc, char *argv[])
 	handle_help_version_opts(argc, argv, "reindexdb", help);
 
 	/* process command-line options */
-	while ((c = getopt_long(argc, argv, "h:p:U:wWeqS:d:ast:i:v", long_options, &optindex)) != -1)
+	while ((c = getopt_long(argc, argv, "h:p:U:wWeqS:d:ast:i:j:v", long_options, &optindex)) != -1)
 	{
 		switch (c)
 		{
@@ -114,6 +132,7 @@ main(int argc, char *argv[])
 				break;
 			case 'S':
 				simple_string_list_append(&schemas, optarg);
+				nsp_count++;
 				break;
 			case 'd':
 				dbname = pg_strdup(optarg);
@@ -126,10 +145,25 @@ main(int argc, char *argv[])
 				break;
 			case 't':
 				simple_string_list_append(&tables, optarg);
+				tbl_count++;
 				break;
 			case 'i':
 				simple_string_list_append(&indexes, optarg);
 				break;
+			case 'j':
+				concurrentCons = atoi(optarg);
+				if (concurrentCons <= 0)
+				{
+					pg_log_error("number of parallel jobs must be at least 1");
+					exit(1);
+				}
+				if (concurrentCons > FD_SETSIZE - 1)
+				{
+					pg_log_error("too many parallel jobs requested (maximum: %d)",
+								 FD_SETSIZE - 1);
+					exit(1);
+				}
+				break;
 			case 'v':
 				verbose = true;
 				break;
@@ -194,7 +228,8 @@ main(int argc, char *argv[])
 		}
 
 		reindex_all_databases(maintenance_db, host, port, username,
-							  prompt_password, progname, echo, quiet, verbose, concurrently);
+							  prompt_password, progname, echo, quiet, verbose,
+							  concurrently, concurrentCons);
 	}
 	else if (syscatalog)
 	{
@@ -214,6 +249,12 @@ main(int argc, char *argv[])
 			exit(1);
 		}
 
+		if (concurrentCons > 1)
+		{
+			pg_log_error("cannot use multiple jobs to reindex system catalogs");
+			exit(1);
+		}
+
 		if (dbname == NULL)
 		{
 			if (getenv("PGDATABASE"))
@@ -224,9 +265,9 @@ main(int argc, char *argv[])
 				dbname = get_user_name_or_exit(progname);
 		}
 
-		reindex_one_database(NULL, dbname, REINDEX_SYSTEM, host,
+		reindex_one_database(dbname, REINDEX_SYSTEM, NULL, host,
 							 port, username, prompt_password, progname,
-							 echo, verbose, concurrently);
+							 echo, verbose, concurrently, 1);
 	}
 	else
 	{
@@ -241,61 +282,57 @@ main(int argc, char *argv[])
 		}
 
 		if (schemas.head != NULL)
-		{
-			SimpleStringListCell *cell;
-
-			for (cell = schemas.head; cell; cell = cell->next)
-			{
-				reindex_one_database(cell->val, dbname, REINDEX_SCHEMA, host,
-									 port, username, prompt_password, progname,
-									 echo, verbose, concurrently);
-			}
-		}
+			reindex_one_database(dbname, REINDEX_SCHEMA, &schemas, host,
+								 port, username, prompt_password, progname,
+								 echo, verbose, concurrently,
+								 Min(concurrentCons, nsp_count));
 
 		if (indexes.head != NULL)
 		{
-			SimpleStringListCell *cell;
-
-			for (cell = indexes.head; cell; cell = cell->next)
-			{
-				reindex_one_database(cell->val, dbname, REINDEX_INDEX, host,
-									 port, username, prompt_password, progname,
-									 echo, verbose, concurrently);
-			}
+			/*
+			 * An index list cannot be processed by multiple connections, as
+			 * it could cause conflicts if reindexing multiple indexes from
+			 * the same table.  We simply ignore the passed number of jobs if
+			 * any.
+			 */
+			reindex_one_database(dbname, REINDEX_INDEX, &indexes, host,
+								 port, username, prompt_password, progname,
+								 echo, verbose, concurrently, 1);
 		}
+
 		if (tables.head != NULL)
-		{
-			SimpleStringListCell *cell;
-
-			for (cell = tables.head; cell; cell = cell->next)
-			{
-				reindex_one_database(cell->val, dbname, REINDEX_TABLE, host,
-									 port, username, prompt_password, progname,
-									 echo, verbose, concurrently);
-			}
-		}
+			reindex_one_database(dbname, REINDEX_TABLE, &tables, host,
+								 port, username, prompt_password, progname,
+								 echo, verbose, concurrently,
+								 Min(concurrentCons, tbl_count));
 
 		/*
 		 * reindex database only if neither index nor table nor schema is
 		 * specified
 		 */
 		if (indexes.head == NULL && tables.head == NULL && schemas.head == NULL)
-			reindex_one_database(NULL, dbname, REINDEX_DATABASE, host,
+			reindex_one_database(dbname, REINDEX_DATABASE, NULL, host,
 								 port, username, prompt_password, progname,
-								 echo, verbose, concurrently);
+								 echo, verbose, concurrently, concurrentCons);
 	}
 
 	exit(0);
 }
 
 static void
-reindex_one_database(const char *name, const char *dbname, ReindexType type,
-					 const char *host, const char *port, const char *username,
+reindex_one_database(const char *dbname, ReindexType type,
+					 SimpleStringList *user_list, const char *host,
+					 const char *port, const char *username,
 					 enum trivalue prompt_password, const char *progname, bool echo,
-					 bool verbose, bool concurrently)
+					 bool verbose, bool concurrently, int concurrentCons)
 {
-	PQExpBufferData sql;
 	PGconn	   *conn;
+	SimpleStringListCell *cell;
+	bool		parallel = concurrentCons > 1;
+	SimpleStringList *process_list = user_list;
+	ReindexType process_type = type;
+	ParallelSlot *slots;
+	bool		failed = false;
 
 	conn = connectDatabase(dbname, host, port, username, prompt_password,
 						   progname, echo, false, false);
@@ -308,6 +345,107 @@ reindex_one_database(const char *name, const char *dbname, ReindexType type,
 		exit(1);
 	}
 
+	if (!parallel)
+	{
+		if (user_list == NULL)
+		{
+			/*
+			 * Create a dummy list with an empty string, as user requires an
+			 * element.
+			 */
+			process_list = pg_malloc0(sizeof(SimpleStringList));
+			simple_string_list_append(process_list, "");
+		}
+	}
+	else
+	{
+		/*
+		 * Database-wide parallel reindex requires special processing.  If
+		 * multiple jobs were asked, we have to reindex system catalogs first,
+		 * as they can't be processed in parallel.
+		 */
+		if (process_type == REINDEX_DATABASE)
+		{
+			Assert(user_list == NULL);
+			run_reindex_command(conn, REINDEX_SYSTEM, NULL, echo, verbose,
+								concurrently, false);
+
+			process_list = get_parallel_object_list(conn, process_type,
+													user_list, echo);
+			process_type = REINDEX_TABLE;
+
+			/* Bail out if nothing to process */
+			if (process_list == NULL)
+			{
+				PQfinish(conn);
+				return;
+			}
+		}
+		else if (process_type == REINDEX_SCHEMA)
+		{
+			Assert(user_list != NULL);
+
+			process_list = get_parallel_object_list(conn, process_type,
+													user_list, echo);
+			process_type = REINDEX_TABLE;
+
+			/* Bail out if nothing to process */
+			if (process_list == NULL)
+			{
+				PQfinish(conn);
+				return;
+			}
+		}
+		else
+			Assert(user_list != NULL);
+	}
+
+	slots = ParallelSlotsSetup(dbname, host, port, username, prompt_password,
+							   progname, echo, conn, concurrentCons);
+
+	cell = process_list->head;
+	do
+	{
+		const char *objname = cell->val;
+		ParallelSlot *free_slot = NULL;
+
+		if (CancelRequested)
+		{
+			failed = true;
+			goto finish;
+		}
+
+		free_slot = ParallelSlotsGetIdle(slots, concurrentCons);
+		if (!free_slot)
+		{
+			failed = true;
+			goto finish;
+		}
+
+		run_reindex_command(free_slot->connection, process_type, objname,
+							echo, verbose, concurrently, true);
+
+		cell = cell->next;
+	} while (cell != NULL);
+
+	if (!ParallelSlotsWaitCompletion(slots, concurrentCons))
+		failed = true;
+
+finish:
+	ParallelSlotsTerminate(slots, concurrentCons);
+	pfree(slots);
+
+	if (failed)
+		exit(1);
+}
+
+static void
+run_reindex_command(PGconn *conn, ReindexType type, const char *name,
+					bool echo, bool verbose, bool concurrently, bool async)
+{
+	PQExpBufferData sql;
+	bool		status;
+
 	/* build the REINDEX query */
 	initPQExpBuffer(&sql);
 
@@ -358,7 +496,17 @@ reindex_one_database(const char *name, const char *dbname, ReindexType type,
 	/* finish the query */
 	appendPQExpBufferChar(&sql, ';');
 
-	if (!executeMaintenanceCommand(conn, sql.data, echo))
+	if (async)
+	{
+		if (echo)
+			printf("%s\n", sql.data);
+
+		status = PQsendQuery(conn, sql.data) == 1;
+	}
+	else
+		status = executeMaintenanceCommand(conn, sql.data, echo);
+
+	if (!status)
 	{
 		switch (type)
 		{
@@ -383,20 +531,135 @@ reindex_one_database(const char *name, const char *dbname, ReindexType type,
 							 name, PQdb(conn), PQerrorMessage(conn));
 				break;
 		}
-		PQfinish(conn);
-		exit(1);
+		if (!async)
+		{
+			PQfinish(conn);
+			exit(1);
+		}
 	}
 
-	PQfinish(conn);
 	termPQExpBuffer(&sql);
 }
 
+/*
+ * Prepare the list of objects to process by querying the catalogs.
+ *
+ * This function will return a SimpleStringList object containing the entire
+ * list of tables in the given database that should be processed by a parallel
+ * database-wide reindex (excluding system tables), or NULL if there's no such
+ * table.
+ */
+static SimpleStringList *
+get_parallel_object_list(PGconn *conn, ReindexType type,
+						 SimpleStringList *user_list, bool echo)
+{
+	PQExpBufferData catalog_query;
+	PQExpBufferData buf;
+	PGresult   *res;
+	SimpleStringList *tables;
+	int			ntups,
+				i;
+
+	initPQExpBuffer(&catalog_query);
+
+	/*
+	 * The queries here are using a safe search_path, so there's no need to
+	 * fully qualify everything.
+	 */
+	if (type == REINDEX_DATABASE)
+	{
+		Assert(user_list == NULL);
+
+		appendPQExpBuffer(&catalog_query,
+						  "SELECT c.relname, ns.nspname\n"
+						  " FROM pg_catalog.pg_class c\n"
+						  " JOIN pg_catalog.pg_namespace ns"
+						  " ON c.relnamespace = ns.oid\n"
+						  " WHERE ns.nspname != 'pg_catalog'\n"
+						  "   AND c.relkind IN ("
+						  CppAsString2(RELKIND_RELATION) ", "
+						  CppAsString2(RELKIND_MATVIEW) ")\n"
+						  " ORDER BY c.relpages DESC;");
+	}
+	else if (type == REINDEX_SCHEMA)
+	{
+		SimpleStringListCell *cell;
+		bool		nsp_listed = false;
+
+		Assert(user_list != NULL);
+
+		/*
+		 * All the tables from all the listed schemas are grabbed at
+		 * once.
+		 */
+		appendPQExpBuffer(&catalog_query,
+						  "SELECT c.relname, ns.nspname\n"
+						  " FROM pg_catalog.pg_class c\n"
+						  " JOIN pg_catalog.pg_namespace ns"
+						  " ON c.relnamespace = ns.oid\n"
+						  " WHERE c.relkind IN ("
+						  CppAsString2(RELKIND_RELATION) ", "
+						  CppAsString2(RELKIND_MATVIEW) ")\n"
+						  " AND ns.nspname IN (");
+
+		for (cell = user_list->head; cell; cell = cell->next)
+		{
+			const char *nspname = cell->val;
+
+			if (nsp_listed)
+				appendPQExpBuffer(&catalog_query, ", ");
+			else
+				nsp_listed = true;
+
+			appendStringLiteralConn(&catalog_query, nspname, conn);
+
+		}
+
+		appendPQExpBuffer(&catalog_query, ")\n"
+						  " ORDER BY c.relpages DESC;");
+	}
+	else
+		Assert(false);
+
+	res = executeQuery(conn, catalog_query.data, echo);
+	termPQExpBuffer(&catalog_query);
+
+	/*
+	 * If no rows are returned, there are no matching tables, so we are done.
+	 */
+	ntups = PQntuples(res);
+	if (ntups == 0)
+	{
+		PQclear(res);
+		PQfinish(conn);
+		return NULL;
+	}
+
+	tables = pg_malloc0(sizeof(SimpleStringList));
+
+	/* Build qualified identifiers for each table */
+	initPQExpBuffer(&buf);
+	for (i = 0; i < ntups; i++)
+	{
+		appendPQExpBufferStr(&buf,
+							 fmtQualifiedId(PQgetvalue(res, i, 1),
+											PQgetvalue(res, i, 0)));
+
+		simple_string_list_append(tables, buf.data);
+		resetPQExpBuffer(&buf);
+	}
+	termPQExpBuffer(&buf);
+	PQclear(res);
+
+	return tables;
+}
+
 static void
 reindex_all_databases(const char *maintenance_db,
 					  const char *host, const char *port,
 					  const char *username, enum trivalue prompt_password,
 					  const char *progname, bool echo, bool quiet, bool verbose,
-					  bool concurrently)
+					  bool concurrently, int concurrentCons)
 {
 	PGconn	   *conn;
 	PGresult   *result;
@@ -423,9 +686,10 @@ reindex_all_databases(const char *maintenance_db,
 		appendPQExpBufferStr(&connstr, "dbname=");
 		appendConnStrVal(&connstr, dbname);
 
-		reindex_one_database(NULL, connstr.data, REINDEX_DATABASE, host,
+		reindex_one_database(connstr.data, REINDEX_DATABASE, NULL, host,
 							 port, username, prompt_password,
-							 progname, echo, verbose, concurrently);
+							 progname, echo, verbose, concurrently,
+							 concurrentCons);
 	}
 	termPQExpBuffer(&connstr);
 
@@ -444,6 +708,7 @@ help(const char *progname)
 	printf(_("  -d, --dbname=DBNAME       database to reindex\n"));
 	printf(_("  -e, --echo                show the commands being sent to the server\n"));
 	printf(_("  -i, --index=INDEX         recreate specific index(es) only\n"));
+	printf(_("  -j, --jobs=NUM            use this many concurrent connections to reindex\n"));
 	printf(_("  -q, --quiet               don't write any messages\n"));
 	printf(_("  -s, --system              reindex system catalogs\n"));
 	printf(_("  -S, --schema=SCHEMA       reindex specific schema(s) only\n"));
diff --git a/src/bin/scripts/t/090_reindexdb.pl b/src/bin/scripts/t/090_reindexdb.pl
index 1af8ab70ad..d058690892 100644
--- a/src/bin/scripts/t/090_reindexdb.pl
+++ b/src/bin/scripts/t/090_reindexdb.pl
@@ -3,7 +3,7 @@ use warnings;
 
 use PostgresNode;
 use TestLib;
-use Test::More tests => 34;
+use Test::More tests => 39;
 
 program_help_ok('reindexdb');
 program_version_ok('reindexdb');
@@ -77,3 +77,27 @@ $node->command_ok(
 $node->command_ok(
 	[qw(reindexdb --echo --system dbname=template1)],
 	'reindexdb system with connection string');
+
+# parallel processing
+$node->safe_psql(
+	'postgres', q|
+	CREATE SCHEMA s1;
+	CREATE TABLE s1.t1(id integer);
+	CREATE INDEX ON s1.t1(id);
+	CREATE SCHEMA s2;
+	CREATE TABLE s2.t2(id integer);
+	CREATE INDEX ON s2.t2(id);
+|);
+
+$node->command_fails(
+	[ 'reindexdb', '-j', '2', '-s', 'postgres' ],
+	'reindexdb cannot process system catalogs in parallel');
+$node->issues_sql_like(
+	[ 'reindexdb', '-j', '2', 'postgres' ],
+	qr/statement:\ REINDEX SYSTEM postgres;
+.*statement:\ REINDEX TABLE public\.test1/s,
+	'global and parallel reindex issues REINDEX SYSTEM');
+$node->issues_sql_like(
+	[ 'reindexdb', '-j', '2', '-S', 's1', '-S', 's2', 'postgres' ],
+	qr/statement:\ REINDEX TABLE s1.t1;/,
+	'Schema-specific and parallel reindex will issue per-table REINDEX');

Attachment: signature.asc
Description: PGP signature

Reply via email to