*** a/doc/src/sgml/ref/vacuumdb.sgml
--- b/doc/src/sgml/ref/vacuumdb.sgml
***************
*** 204,209 **** PostgreSQL documentation
--- 204,228 ----
       </varlistentry>
  
       <varlistentry>
+       <term><option>-j <replaceable class="parameter">jobs</replaceable></option></term>
+       <term><option>--jobs=<replaceable class="parameter">njobs</replaceable></option></term>
+       <listitem>
+        <para>
+         This option will enable the vacuum operation to run on concurrent
+         connections. Maximum number of tables can be vacuumed concurrently
+         is equal to number of jobs. If number of jobs given is more than
+         number of tables then number of jobs will be set to number of tables.
+        </para>
+        <para>
+         <application>vacuumdb</application> will open
+         <replaceable class="parameter"> njobs</replaceable> connections to the
+         database, so make sure your <xref linkend="guc-max-connections">
+         setting is high enough to accommodate all connections.
+        </para>
+       </listitem>
+      </varlistentry>
+ 
+      <varlistentry>
        <term><option>--analyze-in-stages</option></term>
        <listitem>
         <para>
*** a/src/backend/port/win32/socket.c
--- b/src/backend/port/win32/socket.c
***************
*** 42,48 **** int			pgwin32_noblock = 0;
  /*
   * Convert the last socket error code into errno
   */
! static void
  TranslateSocketError(void)
  {
  	switch (WSAGetLastError())
--- 42,48 ----
  /*
   * Convert the last socket error code into errno
   */
! void
  TranslateSocketError(void)
  {
  	switch (WSAGetLastError())
*** a/src/bin/pg_dump/parallel.c
--- b/src/bin/pg_dump/parallel.c
***************
*** 1160,1166 **** select_loop(int maxFd, fd_set *workerset)
  		i = select(maxFd + 1, workerset, NULL, NULL, NULL);
  
  		/*
! 		 * If we Ctrl-C the master process , it's likely that we interrupt
  		 * select() here. The signal handler will set wantAbort == true and
  		 * the shutdown journey starts from here. Note that we'll come back
  		 * here later when we tell all workers to terminate and read their
--- 1160,1166 ----
  		i = select(maxFd + 1, workerset, NULL, NULL, NULL);
  
  		/*
! 		 * If we Ctrl-C the master process, it's likely that we interrupt
  		 * select() here. The signal handler will set wantAbort == true and
  		 * the shutdown journey starts from here. Note that we'll come back
  		 * here later when we tell all workers to terminate and read their
*** a/src/bin/scripts/common.c
--- b/src/bin/scripts/common.c
***************
*** 19,28 ****
  
  #include "common.h"
  
- static void SetCancelConn(PGconn *conn);
- static void ResetCancelConn(void);
  
  static PGcancel *volatile cancelConn = NULL;
  
  #ifdef WIN32
  static CRITICAL_SECTION cancelConnLock;
--- 19,27 ----
  
  #include "common.h"
  
  
  static PGcancel *volatile cancelConn = NULL;
+ bool CancelRequested = false;
  
  #ifdef WIN32
  static CRITICAL_SECTION cancelConnLock;
***************
*** 291,297 **** yesno_prompt(const char *question)
   *
   * Set cancelConn to point to the current database connection.
   */
! static void
  SetCancelConn(PGconn *conn)
  {
  	PGcancel   *oldCancelConn;
--- 290,296 ----
   *
   * Set cancelConn to point to the current database connection.
   */
! void
  SetCancelConn(PGconn *conn)
  {
  	PGcancel   *oldCancelConn;
***************
*** 321,327 **** SetCancelConn(PGconn *conn)
   *
   * Free the current cancel connection, if any, and set to NULL.
   */
! static void
  ResetCancelConn(void)
  {
  	PGcancel   *oldCancelConn;
--- 320,326 ----
   *
   * Free the current cancel connection, if any, and set to NULL.
   */
! void
  ResetCancelConn(void)
  {
  	PGcancel   *oldCancelConn;
***************
*** 345,353 **** ResetCancelConn(void)
  
  #ifndef WIN32
  /*
!  * Handle interrupt signals by canceling the current command,
!  * if it's being executed through executeMaintenanceCommand(),
!  * and thus has a cancelConn set.
   */
  static void
  handle_sigint(SIGNAL_ARGS)
--- 344,351 ----
  
  #ifndef WIN32
  /*
!  * Handle interrupt signals by canceling the current command, if a cancelConn
!  * is set.
   */
  static void
  handle_sigint(SIGNAL_ARGS)
***************
*** 359,368 **** handle_sigint(SIGNAL_ARGS)
--- 357,371 ----
  	if (cancelConn != NULL)
  	{
  		if (PQcancel(cancelConn, errbuf, sizeof(errbuf)))
+ 		{
+ 			CancelRequested = true;
  			fprintf(stderr, _("Cancel request sent\n"));
+ 		}
  		else
  			fprintf(stderr, _("Could not send cancel request: %s"), errbuf);
  	}
+ 	else
+ 		CancelRequested = true;
  
  	errno = save_errno;			/* just in case the write changed it */
  }
***************
*** 392,401 **** consoleHandler(DWORD dwCtrlType)
--- 395,410 ----
  		if (cancelConn != NULL)
  		{
  			if (PQcancel(cancelConn, errbuf, sizeof(errbuf)))
+ 			{
  				fprintf(stderr, _("Cancel request sent\n"));
+ 				CancelRequested = true;
+ 			}
  			else
  				fprintf(stderr, _("Could not send cancel request: %s"), errbuf);
  		}
+ 		else
+ 			CancelRequested = true;
+ 
  		LeaveCriticalSection(&cancelConnLock);
  
  		return TRUE;
*** a/src/bin/scripts/common.h
--- b/src/bin/scripts/common.h
***************
*** 21,26 **** enum trivalue
--- 21,28 ----
  	TRI_YES
  };
  
+ extern bool CancelRequested;
+ 
  typedef void (*help_handler) (const char *progname);
  
  extern void handle_help_version_opts(int argc, char *argv[],
***************
*** 49,52 **** extern bool yesno_prompt(const char *question);
--- 51,58 ----
  
  extern void setup_cancel_handler(void);
  
+ extern void SetCancelConn(PGconn *conn);
+ extern void ResetCancelConn(void);
+ 
+ 
  #endif   /* COMMON_H */
*** a/src/bin/scripts/vacuumdb.c
--- b/src/bin/scripts/vacuumdb.c
***************
*** 11,34 ****
   */
  
  #include "postgres_fe.h"
  #include "common.h"
  #include "dumputils.h"
  
  
! static void vacuum_one_database(const char *dbname, bool full, bool verbose,
! 	bool and_analyze, bool analyze_only, bool analyze_in_stages, int stage, bool freeze,
! 					const char *table, const char *host, const char *port,
  					const char *username, enum trivalue prompt_password,
! 					const char *progname, bool echo, bool quiet);
! static void vacuum_all_databases(bool full, bool verbose, bool and_analyze,
! 					 bool analyze_only, bool analyze_in_stages, bool freeze,
  					 const char *maintenance_db,
  					 const char *host, const char *port,
  					 const char *username, enum trivalue prompt_password,
  					 const char *progname, bool echo, bool quiet);
  
  static void help(const char *progname);
  
  
  int
  main(int argc, char *argv[])
--- 11,111 ----
   */
  
  #include "postgres_fe.h"
+ 
  #include "common.h"
  #include "dumputils.h"
  
  
! #define ERRCODE_UNDEFINED_TABLE  "42P01"
! 
! /* Parallel vacuuming stuff */
! typedef struct ParallelSlot
! {
! 	PGconn *connection;
! 	pgsocket sock;
! 	bool isFree;
! } ParallelSlot;
! 
! /* vacuum options controlled by user flags */
! typedef struct vacuumingOptions
! {
! 	bool	analyze_only;
! 	bool	verbose;
! 	bool	and_analyze;
! 	bool	full;
! 	bool	freeze;
! } vacuumingOptions;
! 
! 
! static void vacuum_one_database(const char *dbname, vacuumingOptions *vacopts,
! 					bool analyze_in_stages, int stage,
! 					SimpleStringList *tables,
! 					const char *host, const char *port,
  					const char *username, enum trivalue prompt_password,
! 					const char *progname, bool echo, bool quiet,
! 					int concurrentCons);
! 
! static void vacuum_all_databases(vacuumingOptions *vacopts,
! 					 bool analyze_in_stages,
  					 const char *maintenance_db,
  					 const char *host, const char *port,
  					 const char *username, enum trivalue prompt_password,
+ 					 int concurrentCons,
  					 const char *progname, bool echo, bool quiet);
+ static void vacuum_database_stage(const char *dbname, vacuumingOptions *vacopts,
+ 					  bool analyze_in_stages, int stage,
+ 					  SimpleStringList *tables,
+ 					  const char *host, const char *port, const char *username,
+ 					  enum trivalue prompt_password,
+ 					  int concurrentCons,
+ 					  const char *progname, bool echo, bool quiet);
  
  static void help(const char *progname);
  
+ static void prepare_command(PQExpBuffer sql, PGconn *conn,
+ 				vacuumingOptions *vacopts, const char *table);
+ 
+ static ParallelSlot *GetIdleSlot(ParallelSlot slots[], int numslots,
+ 			const char *dbname, const char *progname);
+ 
+ static bool GetQueryResult(PGconn *conn, const char *dbname,
+ 			   const char *progname);
+ 
+ static int select_loop(int maxFd, fd_set *workerset, bool *aborting);
+ 
+ static void DisconnectDatabase(ParallelSlot *slot);
+ static void init_slot(ParallelSlot *slot, PGconn *conn);
+ 
+ 
+ 
+ /*
+  * Preparatory commands and corresponding user-visible message for the
+  * analyze-in-stages feature.  Note the ANALYZE command itself must be sent
+  * separately.
+  */
+ static const struct
+ {
+ 	const char *prepcmd;
+ 	const char *message;
+ }
+ staged_analyze[3] =
+ {
+ 	{
+ 		"SET default_statistics_target=1; SET vacuum_cost_delay=0;",
+ 		gettext_noop("Generating minimal optimizer statistics (1 target)")
+ 	},
+ 	{
+ 		"SET default_statistics_target=10; RESET vacuum_cost_delay;",
+ 		gettext_noop("Generating medium optimizer statistics (10 targets)")
+ 	},
+ 	{
+ 		"RESET default_statistics_target;",
+ 		gettext_noop("Generating default (full) optimizer statistics")
+ 	}
+ };
+ 
+ #define ANALYZE_ALL_STAGES	-1
+ 
  
  int
  main(int argc, char *argv[])
***************
*** 49,54 **** main(int argc, char *argv[])
--- 126,132 ----
  		{"table", required_argument, NULL, 't'},
  		{"full", no_argument, NULL, 'f'},
  		{"verbose", no_argument, NULL, 'v'},
+ 		{"jobs", required_argument, NULL, 'j'},
  		{"maintenance-db", required_argument, NULL, 2},
  		{"analyze-in-stages", no_argument, NULL, 3},
  		{NULL, 0, NULL, 0}
***************
*** 57,63 **** main(int argc, char *argv[])
  	const char *progname;
  	int			optindex;
  	int			c;
- 
  	const char *dbname = NULL;
  	const char *maintenance_db = NULL;
  	char	   *host = NULL;
--- 135,140 ----
***************
*** 66,86 **** main(int argc, char *argv[])
  	enum trivalue prompt_password = TRI_DEFAULT;
  	bool		echo = false;
  	bool		quiet = false;
! 	bool		and_analyze = false;
! 	bool		analyze_only = false;
  	bool		analyze_in_stages = false;
- 	bool		freeze = false;
  	bool		alldb = false;
- 	bool		full = false;
- 	bool		verbose = false;
  	SimpleStringList tables = {NULL, NULL};
  
  	progname = get_progname(argv[0]);
  	set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pgscripts"));
  
  	handle_help_version_opts(argc, argv, "vacuumdb", help);
  
! 	while ((c = getopt_long(argc, argv, "h:p:U:wWeqd:zZFat:fv", long_options, &optindex)) != -1)
  	{
  		switch (c)
  		{
--- 143,165 ----
  	enum trivalue prompt_password = TRI_DEFAULT;
  	bool		echo = false;
  	bool		quiet = false;
! 	vacuumingOptions vacopts;
  	bool		analyze_in_stages = false;
  	bool		alldb = false;
  	SimpleStringList tables = {NULL, NULL};
+ 	int concurrentCons = 0;
+ 	int tbl_count = 0;
+ 
+ 	/* initialize options to all false */
+ 	memset(&vacopts, 0, sizeof(vacopts));
  
  	progname = get_progname(argv[0]);
+ 
  	set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pgscripts"));
  
  	handle_help_version_opts(argc, argv, "vacuumdb", help);
  
! 	while ((c = getopt_long(argc, argv, "h:p:U:wWeqd:zZFat:fvj:", long_options, &optindex)) != -1)
  	{
  		switch (c)
  		{
***************
*** 109,139 **** main(int argc, char *argv[])
  				dbname = pg_strdup(optarg);
  				break;
  			case 'z':
! 				and_analyze = true;
  				break;
  			case 'Z':
! 				analyze_only = true;
  				break;
  			case 'F':
! 				freeze = true;
  				break;
  			case 'a':
  				alldb = true;
  				break;
  			case 't':
  				simple_string_list_append(&tables, optarg);
  				break;
  			case 'f':
! 				full = true;
  				break;
  			case 'v':
! 				verbose = true;
  				break;
  			case 2:
  				maintenance_db = pg_strdup(optarg);
  				break;
  			case 3:
! 				analyze_in_stages = analyze_only = true;
  				break;
  			default:
  				fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
--- 188,230 ----
  				dbname = pg_strdup(optarg);
  				break;
  			case 'z':
! 				vacopts.and_analyze = true;
  				break;
  			case 'Z':
! 				vacopts.analyze_only = true;
  				break;
  			case 'F':
! 				vacopts.freeze = true;
  				break;
  			case 'a':
  				alldb = true;
  				break;
  			case 't':
+ 			{
  				simple_string_list_append(&tables, optarg);
+ 				tbl_count++;
  				break;
+ 			}
  			case 'f':
! 				vacopts.full = true;
  				break;
  			case 'v':
! 				vacopts.verbose = true;
! 				break;
! 			case 'j':
! 				concurrentCons = atoi(optarg);
! 				if (concurrentCons <= 0)
! 				{
! 					fprintf(stderr, _("%s: number of parallel \"jobs\" must be at least 1\n"),
! 							progname);
! 					exit(1);
! 				}
  				break;
  			case 2:
  				maintenance_db = pg_strdup(optarg);
  				break;
  			case 3:
! 				analyze_in_stages = vacopts.analyze_only = true;
  				break;
  			default:
  				fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
***************
*** 141,147 **** main(int argc, char *argv[])
  		}
  	}
  
- 
  	/*
  	 * Non-option argument specifies database name as long as it wasn't
  	 * already specified with -d / --dbname
--- 232,237 ----
***************
*** 160,177 **** main(int argc, char *argv[])
  		exit(1);
  	}
  
! 	if (analyze_only)
  	{
! 		if (full)
  		{
! 			fprintf(stderr, _("%s: cannot use the \"full\" option when performing only analyze\n"),
! 					progname);
  			exit(1);
  		}
! 		if (freeze)
  		{
! 			fprintf(stderr, _("%s: cannot use the \"freeze\" option when performing only analyze\n"),
! 					progname);
  			exit(1);
  		}
  		/* allow 'and_analyze' with 'analyze_only' */
--- 250,267 ----
  		exit(1);
  	}
  
! 	if (vacopts.analyze_only)
  	{
! 		if (vacopts.full)
  		{
! 			fprintf(stderr, _("%s: cannot use the \"%s\" option when performing only analyze\n"),
! 					progname, "full");
  			exit(1);
  		}
! 		if (vacopts.freeze)
  		{
! 			fprintf(stderr, _("%s: cannot use the \"%s\" option when performing only analyze\n"),
! 					progname, "freeze");
  			exit(1);
  		}
  		/* allow 'and_analyze' with 'analyze_only' */
***************
*** 179,184 **** main(int argc, char *argv[])
--- 269,278 ----
  
  	setup_cancel_handler();
  
+ 	/* Avoid opening extra connections. */
+ 	if (tbl_count && (concurrentCons > tbl_count))
+ 		concurrentCons = tbl_count;
+ 
  	if (alldb)
  	{
  		if (dbname)
***************
*** 194,202 **** main(int argc, char *argv[])
  			exit(1);
  		}
  
! 		vacuum_all_databases(full, verbose, and_analyze, analyze_only, analyze_in_stages, freeze,
! 							 maintenance_db, host, port, username,
! 							 prompt_password, progname, echo, quiet);
  	}
  	else
  	{
--- 288,299 ----
  			exit(1);
  		}
  
! 		vacuum_all_databases(&vacopts,
! 							 analyze_in_stages,
! 							 maintenance_db,
! 							 host, port, username, prompt_password,
! 							 concurrentCons,
! 							 progname, echo, quiet);
  	}
  	else
  	{
***************
*** 210,244 **** main(int argc, char *argv[])
  				dbname = get_user_name_or_exit(progname);
  		}
  
! 		if (tables.head != NULL)
! 		{
! 			SimpleStringListCell *cell;
! 
! 			for (cell = tables.head; cell; cell = cell->next)
! 			{
! 				vacuum_one_database(dbname, full, verbose, and_analyze,
! 									analyze_only, analyze_in_stages, -1,
! 									freeze, cell->val,
! 									host, port, username, prompt_password,
! 									progname, echo, quiet);
! 			}
! 		}
! 		else
! 			vacuum_one_database(dbname, full, verbose, and_analyze,
! 								analyze_only, analyze_in_stages, -1,
! 								freeze, NULL,
! 								host, port, username, prompt_password,
! 								progname, echo, quiet);
  	}
  
  	exit(0);
  }
  
! 
  static void
! run_vacuum_command(PGconn *conn, const char *sql, bool echo, const char *dbname, const char *table, const char *progname)
  {
! 	if (!executeMaintenanceCommand(conn, sql, echo))
  	{
  		if (table)
  			fprintf(stderr, _("%s: vacuuming of table \"%s\" in database \"%s\" failed: %s"),
--- 307,341 ----
  				dbname = get_user_name_or_exit(progname);
  		}
  
! 		vacuum_database_stage(dbname, &vacopts,
! 							  analyze_in_stages, ANALYZE_ALL_STAGES,
! 							  &tables,
! 							  host, port, username, prompt_password,
! 							  concurrentCons,
! 							  progname, echo, quiet);
  	}
  
  	exit(0);
  }
  
! /*
!  * Execute a vacuum/analyze command to the server.
!  *
!  * Result status is checked only if 'async' is false.
!  */
  static void
! run_vacuum_command(PGconn *conn, const char *sql, bool echo,
! 				   const char *dbname, const char *table,
! 				   const char *progname, bool async)
  {
! 	if (async)
! 	{
! 		if (echo)
! 			printf("%s\n", sql);
! 
! 		PQsendQuery(conn, sql);
! 	}
! 	else if (!executeMaintenanceCommand(conn, sql, echo))
  	{
  		if (table)
  			fprintf(stderr, _("%s: vacuuming of table \"%s\" in database \"%s\" failed: %s"),
***************
*** 251,422 **** run_vacuum_command(PGconn *conn, const char *sql, bool echo, const char *dbname,
  	}
  }
  
! 
  static void
! vacuum_one_database(const char *dbname, bool full, bool verbose, bool and_analyze,
! 	bool analyze_only, bool analyze_in_stages, int stage, bool freeze, const char *table,
  					const char *host, const char *port,
  					const char *username, enum trivalue prompt_password,
! 					const char *progname, bool echo, bool quiet)
  {
  	PQExpBufferData sql;
- 
  	PGconn	   *conn;
! 
! 	initPQExpBuffer(&sql);
  
  	conn = connectDatabase(dbname, host, port, username, prompt_password,
  						   progname, false);
  
! 	if (analyze_only)
  	{
! 		appendPQExpBufferStr(&sql, "ANALYZE");
! 		if (verbose)
! 			appendPQExpBufferStr(&sql, " VERBOSE");
  	}
! 	else
  	{
! 		appendPQExpBufferStr(&sql, "VACUUM");
! 		if (PQserverVersion(conn) >= 90000)
  		{
! 			const char *paren = " (";
! 			const char *comma = ", ";
! 			const char *sep = paren;
  
! 			if (full)
  			{
! 				appendPQExpBuffer(&sql, "%sFULL", sep);
! 				sep = comma;
  			}
! 			if (freeze)
  			{
! 				appendPQExpBuffer(&sql, "%sFREEZE", sep);
! 				sep = comma;
  			}
! 			if (verbose)
  			{
! 				appendPQExpBuffer(&sql, "%sVERBOSE", sep);
! 				sep = comma;
  			}
! 			if (and_analyze)
  			{
! 				appendPQExpBuffer(&sql, "%sANALYZE", sep);
! 				sep = comma;
  			}
- 			if (sep != paren)
- 				appendPQExpBufferStr(&sql, ")");
- 		}
- 		else
- 		{
- 			if (full)
- 				appendPQExpBufferStr(&sql, " FULL");
- 			if (freeze)
- 				appendPQExpBufferStr(&sql, " FREEZE");
- 			if (verbose)
- 				appendPQExpBufferStr(&sql, " VERBOSE");
- 			if (and_analyze)
- 				appendPQExpBufferStr(&sql, " ANALYZE");
  		}
- 	}
- 	if (table)
- 		appendPQExpBuffer(&sql, " %s", table);
- 	appendPQExpBufferStr(&sql, ";");
  
! 	if (analyze_in_stages)
! 	{
! 		const char *stage_commands[] = {
! 			"SET default_statistics_target=1; SET vacuum_cost_delay=0;",
! 			"SET default_statistics_target=10; RESET vacuum_cost_delay;",
! 			"RESET default_statistics_target;"
! 		};
! 		const char *stage_messages[] = {
! 			gettext_noop("Generating minimal optimizer statistics (1 target)"),
! 			gettext_noop("Generating medium optimizer statistics (10 targets)"),
! 			gettext_noop("Generating default (full) optimizer statistics")
! 		};
! 
! 		if (stage == -1)
  		{
! 			int		i;
  
! 			/* Run all stages. */
! 			for (i = 0; i < 3; i++)
  			{
! 				if (!quiet)
  				{
! 					puts(gettext(stage_messages[i]));
! 					fflush(stdout);
  				}
! 				executeCommand(conn, stage_commands[i], progname, echo);
! 				run_vacuum_command(conn, sql.data, echo, dbname, table, progname);
  			}
! 		}
! 		else
  		{
! 			/* Otherwise, we got a stage from vacuum_all_databases(), so run
! 			 * only that one. */
! 			if (!quiet)
  			{
! 				puts(gettext(stage_messages[stage]));
! 				fflush(stdout);
  			}
- 			executeCommand(conn, stage_commands[stage], progname, echo);
- 			run_vacuum_command(conn, sql.data, echo, dbname, table, progname);
  		}
  
  	}
  	else
! 		run_vacuum_command(conn, sql.data, echo, dbname, NULL, progname);
  
- 	PQfinish(conn);
  	termPQExpBuffer(&sql);
  }
  
  
  static void
! vacuum_all_databases(bool full, bool verbose, bool and_analyze, bool analyze_only,
! 			 bool analyze_in_stages, bool freeze, const char *maintenance_db,
! 					 const char *host, const char *port,
! 					 const char *username, enum trivalue prompt_password,
  					 const char *progname, bool echo, bool quiet)
  {
  	PGconn	   *conn;
  	PGresult   *result;
  	int			stage;
  
  	conn = connectMaintenanceDatabase(maintenance_db, host, port,
  									  username, prompt_password, progname);
! 	result = executeQuery(conn, "SELECT datname FROM pg_database WHERE datallowconn ORDER BY 1;", progname, echo);
  	PQfinish(conn);
  
! 	/* If analyzing in stages, then run through all stages.  Otherwise just
! 	 * run once, passing -1 as the stage. */
! 	for (stage = (analyze_in_stages ? 0 : -1);
! 		 stage < (analyze_in_stages ? 3 : 0);
! 		 stage++)
  	{
- 		int			i;
- 
  		for (i = 0; i < PQntuples(result); i++)
  		{
! 			char	   *dbname = PQgetvalue(result, i, 0);
  
! 			if (!quiet)
  			{
! 				printf(_("%s: vacuuming database \"%s\"\n"), progname, dbname);
! 				fflush(stdout);
  			}
  
! 			vacuum_one_database(dbname, full, verbose, and_analyze, analyze_only,
! 								analyze_in_stages, stage,
! 							freeze, NULL, host, port, username, prompt_password,
! 								progname, echo, quiet);
  		}
  	}
  
! 	PQclear(result);
  }
  
  
  static void
  help(const char *progname)
--- 348,914 ----
  	}
  }
  
! /*
!  * vacuum_one_database
!  *
!  * Process tables in the given database.  If the 'tables' list is empty,
!  * process all tables in the database.  Note there is no paralellization here.
!  */
  static void
! vacuum_one_database(const char *dbname, vacuumingOptions *vacopts,
! 					bool analyze_in_stages, int stage,
! 					SimpleStringList *tables,
  					const char *host, const char *port,
  					const char *username, enum trivalue prompt_password,
! 					const char *progname, bool echo, bool quiet,
! 					int concurrentCons)
  {
  	PQExpBufferData sql;
  	PGconn	   *conn;
! 	SimpleStringListCell *cell;
! 	ParallelSlot *slots = NULL;
! 	SimpleStringList dbtables = {NULL, NULL};
! 	int			i;
! 	bool result = 0;
  
  	conn = connectDatabase(dbname, host, port, username, prompt_password,
  						   progname, false);
  
! 	initPQExpBuffer(&sql);
! 
! 	/*
! 	 * If a table list is not provided and concurrentCons option is given
! 	 * then we need to vacuum the whole database, prepare the list of tables.
! 	 */
! 	if (concurrentCons && (!tables || !tables->head))
  	{
! 		PQExpBufferData buf;
! 		PGresult *res;
! 		int		ntups;
! 		int		i;
! 
! 		initPQExpBuffer(&buf);
! 
! 		res = executeQuery(conn,
! 				"SELECT c.relname, ns.nspname FROM pg_class c, pg_namespace ns\n"
! 				" WHERE relkind IN (\'r\', \'m\') AND c.relnamespace = ns.oid\n"
! 				" ORDER BY c.relpages DESC",
! 				progname, echo);
! 
! 		ntups = PQntuples(res);
! 		for (i = 0; i < ntups; i++)
! 		{
! 			appendPQExpBuffer(&buf, "%s",
! 							  fmtQualifiedId(PQserverVersion(conn),
! 											 PQgetvalue(res, i, 1),
! 											 PQgetvalue(res, i, 0)));
! 
! 			simple_string_list_append(&dbtables, buf.data);
! 			resetPQExpBuffer(&buf);
! 		}
! 
! 		termPQExpBuffer(&buf);
! 		tables = &dbtables;
! 
! 		/*
! 		 * If there are more connections than vacuumable relations, we don't
! 		 * need to use them all.
! 		 */
! 		if (concurrentCons > ntups)
! 			concurrentCons = ntups;
  	}
! 
! 	if (concurrentCons)
  	{
! 		slots = (ParallelSlot *) pg_malloc(sizeof(ParallelSlot) * concurrentCons);
! 		init_slot(slots, conn);
! 
! 		for (i = 1; i < concurrentCons; i++)
  		{
! 			conn = connectDatabase(dbname, host, port, username, prompt_password,
! 								   progname, false);
! 			init_slot(slots + i, conn);
! 		}
! 	}
! 
! 	for (i = 0; i < 3; i++)
! 	{
! 		cell = tables ? tables->head : NULL;
! 
! 		if (analyze_in_stages)
! 		{
! 			int currentStage;
  
! 			if (stage == ANALYZE_ALL_STAGES)
  			{
! 				currentStage = i;
  			}
! 			else
  			{
! 				currentStage = stage;
  			}
! 
! 			if (!quiet)
  			{
! 				puts(gettext(staged_analyze[currentStage].message));
! 				fflush(stdout);
  			}
! 
! 			if (concurrentCons)
  			{
! 				int j;
! 				for (j = 0; j < concurrentCons; j++)
! 				{
! 					executeCommand((slots + j)->connection,
! 						staged_analyze[currentStage].prepcmd, progname, echo);
! 				}
! 			}
! 			else
! 			{
! 				executeCommand(conn, staged_analyze[currentStage].prepcmd, progname, echo);
  			}
  		}
  
! 		do
  		{
! 			const char *tabname;
! 			tabname = cell ? cell->val : NULL;
! 			prepare_command(&sql, conn, vacopts, tabname);
  
! 			if (concurrentCons)
  			{
! 				ParallelSlot *free_slot;
! 
! 				if (CancelRequested)
! 				{
! 					result = -1;
! 					goto finish;
! 				}
! 
! 				/*
! 				 * Get a free slot, waiting until one becomes free if none currently
! 				 * is.
! 				 */
! 				free_slot = GetIdleSlot(slots, concurrentCons, dbname, progname);
! 				if (!free_slot)
  				{
! 					result = -1;
! 					goto finish;
  				}
! 
! 				free_slot->isFree = false;
! 
! 				run_vacuum_command(free_slot->connection, sql.data,
! 								   echo, dbname, cell->val, progname, true);
  			}
! 			else
! 				run_vacuum_command(conn, sql.data, echo, dbname, NULL, progname, false);
! 
! 			if (cell)
! 				cell = cell->next;
! 		} while (cell != NULL);
! 
! 		if (concurrentCons)
  		{
! 			int j;
! 
! 			for (j = 0; j < concurrentCons; j++)
  			{
! 				/* wait for all connection to return the results */
! 				if (!GetQueryResult((slots + j)->connection, dbname, progname))
! 					goto finish;
! 
! 				(slots + j)->isFree = true;	/* XXX what's the point? */
  			}
  		}
  
+ 		if (!analyze_in_stages || stage != ANALYZE_ALL_STAGES)
+ 			break;
+ 	}
+ 
+ finish:
+ 	if (concurrentCons)
+ 	{
+ 		for (i = 0; i < concurrentCons; i++)
+ 			DisconnectDatabase(&slots[i]);
+ 
+ 		pfree(slots);
  	}
  	else
! 		PQfinish(conn);
  
  	termPQExpBuffer(&sql);
+ 
+ 	if (result == -1)
+ 		exit(1);
  }
  
+ static void
+ init_slot(ParallelSlot *slot, PGconn *conn)
+ {
+ 	slot->connection = conn;
+ 	slot->isFree = true;
+ 	slot->sock = PQsocket(conn);
+ }
  
  static void
! vacuum_all_databases(vacuumingOptions *vacopts,
! 					 bool analyze_in_stages,
! 					 const char *maintenance_db, const char *host,
! 					 const char *port, const char *username,
! 					 enum trivalue prompt_password,
! 					 int concurrentCons,
  					 const char *progname, bool echo, bool quiet)
  {
  	PGconn	   *conn;
  	PGresult   *result;
  	int			stage;
+ 	int			i;
  
  	conn = connectMaintenanceDatabase(maintenance_db, host, port,
  									  username, prompt_password, progname);
! 	result = executeQuery(conn,
! 						  "SELECT datname FROM pg_database WHERE datallowconn ORDER BY 1;",
! 						  progname, echo);
  	PQfinish(conn);
  
! 	if (analyze_in_stages)
! 	{
! 		for (stage = 0; stage < 3; stage++)
! 		{
! 			for (i = 0; i < PQntuples(result); i++)
! 			{
! 				const char *dbname;
! 
! 				dbname = PQgetvalue(result, i, 0);
! 				vacuum_database_stage(dbname, vacopts,
! 									  analyze_in_stages, stage,
! 									  NULL,
! 									  host, port, username, prompt_password,
! 									  concurrentCons,
! 									  progname, echo, quiet);
! 			}
! 		}
! 	}
! 	else
  	{
  		for (i = 0; i < PQntuples(result); i++)
  		{
! 			const char *dbname;
! 
! 			dbname = PQgetvalue(result, i, 0);
! 			vacuum_database_stage(dbname, vacopts,
! 								  analyze_in_stages, ANALYZE_ALL_STAGES,
! 								  NULL,
! 								  host, port, username, prompt_password,
! 								  concurrentCons,
! 								  progname, echo, quiet);
! 		}
! 	}
  
! 	PQclear(result);
! }
! 
! static void
! vacuum_database_stage(const char *dbname, vacuumingOptions *vacopts,
! 					  bool analyze_in_stages, int stage,
! 					  SimpleStringList *tables,
! 					  const char *host, const char *port, const char *username,
! 					  enum trivalue prompt_password,
! 					  int concurrentCons,
! 					  const char *progname, bool echo, bool quiet)
! {
! 	if (!quiet)
! 	{
! 		printf(_("%s: vacuuming database \"%s\"\n"), progname, dbname);
! 		fflush(stdout);
! 	}
! 
! 	vacuum_one_database(dbname, vacopts,
! 						analyze_in_stages, stage,
! 						tables,
! 						host, port, username, prompt_password,
! 						progname, echo, quiet, concurrentCons);
! }
! 
! /*
!  * GetIdleSlot
!  *		Return a connection slot that is ready to execute a command.
!  *
!  * We return the first slot we find that is marked isFree, if one is;
!  * otherwise, we loop on select() until one socket becomes available.  When
!  * this happens, we read the whole set and mark as free all sockets that become
!  * available.
!  *
!  * Process the slot list, if any free slot is available then return the slotid
!  * else perform the select on all the socket's and wait until at least one slot
!  * becomes available.
!  *
!  * If an error occurs, NULL is returned.
!  */
! static ParallelSlot *
! GetIdleSlot(ParallelSlot slots[], int numslots, const char *dbname,
! 			const char *progname)
! {
! 	int		i;
! 	int		firstFree = -1;
! 	fd_set	slotset;
! 	pgsocket maxFd;
! 
! 	for (i = 0; i < numslots; i++)
! 		if ((slots + i)->isFree)
! 			return slots + i;
! 
! 	FD_ZERO(&slotset);
! 
! 	maxFd = slots->sock;
! 	for (i = 0; i < numslots; i++)
! 	{
! 		FD_SET((slots + i)->sock, &slotset);
! 		if ((slots + i)->sock > maxFd)
! 			maxFd = (slots + i)->sock;
! 	}
! 
! 	/*
! 	 * No free slot found, so wait until one of the connections has finished
! 	 * its task and return the available slot.
! 	 */
! 	for (firstFree = -1; firstFree < 0; )
! 	{
! 		bool	aborting;
! 
! 		SetCancelConn(slots->connection);
! 		i = select_loop(maxFd, &slotset, &aborting);
! 		ResetCancelConn();
! 
! 		if (aborting)
! 		{
! 			/*
! 			 * We set the cancel-receiving connection to the one in the zeroth
! 			 * slot above, so fetch the error from there.
! 			 */
! 			GetQueryResult(slots->connection, dbname, progname);
! 			return NULL;
! 		}
! 		Assert(i != 0);
! 
! 		for (i = 0; i < numslots; i++)
! 		{
! 			if (!FD_ISSET((slots + i)->sock, &slotset))
! 				continue;
! 
! 			PQconsumeInput((slots + i)->connection);
! 			if (PQisBusy((slots + i)->connection))
! 				continue;
! 
! 			(slots + i)->isFree = true;
! 
! 			if (!GetQueryResult((slots + i)->connection, dbname, progname))
! 				return NULL;
! 
! 			if (firstFree < 0)
! 				firstFree = i;
! 		}
! 	}
! 
! 	return slots + firstFree;
! }
! 
! /*
!  * GetQueryResult
!  *
!  * Process the query result.  Returns true if there's no error, false
!  * otherwise -- but errors about trying to vacuum a missing relation are
!  * reported and subsequently ignored.
!  */
! static bool
! GetQueryResult(PGconn *conn, const char *dbname, const char *progname)
! {
! 	PGresult    *result;
! 
! 	SetCancelConn(conn);
! 	while ((result = PQgetResult(conn)) != NULL)
! 	{
! 		/*
! 		 * If errors are found, report them.  Errors about a missing table are
! 		 * harmless so we continue processing; but die for other errors.
! 		 */
! 		if (PQresultStatus(result) != PGRES_COMMAND_OK)
! 		{
! 			char *sqlState = PQresultErrorField(result, PG_DIAG_SQLSTATE);
! 
! 			fprintf(stderr, _("%s: vacuuming of database \"%s\" failed: %s"),
! 					progname, dbname, PQerrorMessage(conn));
! 
! 			if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0)
  			{
! 				PQclear(result);
! 				return false;
  			}
+ 		}
+ 
+ 		PQclear(result);
+ 	}
+ 	ResetCancelConn();
+ 
+ 	return true;
+ }
  
! /*
!  * Loop on select() until a descriptor from the given set becomes readable.
!  *
!  * If we get a cancel request while we're waiting, we forego all further
!  * processing and set the *aborting flag to true.  The return value must be
!  * ignored in this case.  Otherwise, *aborting is set to false.
!  */
! static int
! select_loop(int maxFd, fd_set *workerset, bool *aborting)
! {
! 	int			i;
! 	fd_set		saveSet = *workerset;
! 
! 	if (CancelRequested)
! 	{
! 		*aborting = true;
! 		return -1;
! 	}
! 	else
! 		*aborting = false;
! 
! 	for (;;)
! 	{
! 		/*
! 		 * On Windows, we need to check once in a while for cancel requests; on
! 		 * other platforms we rely on select() returning when interrupted.
! 		 */
! 		struct timeval *tvp;
! #ifdef WIN32
! 		struct timeval tv = {0, 1000000};
! 
! 		tvp = &tv;
! #else
! 		tvp = NULL;
! #endif
! 
! 		*workerset = saveSet;
! 		i = select(maxFd + 1, workerset, NULL, NULL, tvp);
! 
! #ifdef WIN32
! 		if (i == SOCKET_ERROR)
! 		{
! 			i = -1;
! 
! 			if (WSAGetLastError() == WSAEINTR)
! 				errno == EINTR;
  		}
+ #endif
+ 
+ 		if (i < 0 && errno == EINTR)
+ 			continue;			/* ignore this */
+ 		if (i < 0 || CancelRequested)
+ 			*aborting = true;	/* but not this */
+ 		if (i == 0)
+ 			continue;			/* timeout (Win32 only) */
+ 		break;
  	}
  
! 	return i;
  }
  
+ /*
+  * DisconnectDatabase
+  *		Disconnect the connection associated with the given slot
+  */
+ static void
+ DisconnectDatabase(ParallelSlot *slot)
+ {
+ 	char		errbuf[256];
+ 
+ 	if (!slot->connection)
+ 		return;
+ 
+ 	if (PQtransactionStatus(slot->connection) == PQTRANS_ACTIVE)
+ 	{
+ 		PGcancel   *cancel;
+ 
+ 		if ((cancel = PQgetCancel(slot->connection)))
+ 		{
+ 			PQcancel(cancel, errbuf, sizeof(errbuf));
+ 			PQfreeCancel(cancel);
+ 		}
+ 	}
+ 
+ 	PQfinish(slot->connection);
+ 	slot->connection = NULL;
+ }
+ 
+ /*
+  * Construct a vacuum/analyze command to run based on the given options, in the
+  * given string buffer, which may contain previous garbage.
+  *
+  * An optional table name can be passed; this must be already be properly
+  * quoted.  The command is semicolon-terminated.
+  */
+ static void
+ prepare_command(PQExpBuffer sql, PGconn *conn, vacuumingOptions *vacopts,
+ 				const char *table)
+ {
+ 	resetPQExpBuffer(sql);
+ 
+ 	if (vacopts->analyze_only)
+ 	{
+ 		appendPQExpBufferStr(sql, "ANALYZE");
+ 		if (vacopts->verbose)
+ 			appendPQExpBufferStr(sql, " VERBOSE");
+ 	}
+ 	else
+ 	{
+ 		appendPQExpBufferStr(sql, "VACUUM");
+ 		if (PQserverVersion(conn) >= 90000)
+ 		{
+ 			const char *paren = " (";
+ 			const char *comma = ", ";
+ 			const char *sep = paren;
+ 
+ 			if (vacopts->full)
+ 			{
+ 				appendPQExpBuffer(sql, "%sFULL", sep);
+ 				sep = comma;
+ 			}
+ 			if (vacopts->freeze)
+ 			{
+ 				appendPQExpBuffer(sql, "%sFREEZE", sep);
+ 				sep = comma;
+ 			}
+ 			if (vacopts->verbose)
+ 			{
+ 				appendPQExpBuffer(sql, "%sVERBOSE", sep);
+ 				sep = comma;
+ 			}
+ 			if (vacopts->and_analyze)
+ 			{
+ 				appendPQExpBuffer(sql, "%sANALYZE", sep);
+ 				sep = comma;
+ 			}
+ 			if (sep != paren)
+ 				appendPQExpBufferStr(sql, ")");
+ 		}
+ 		else
+ 		{
+ 			if (vacopts->full)
+ 				appendPQExpBufferStr(sql, " FULL");
+ 			if (vacopts->freeze)
+ 				appendPQExpBufferStr(sql, " FREEZE");
+ 			if (vacopts->verbose)
+ 				appendPQExpBufferStr(sql, " VERBOSE");
+ 			if (vacopts->and_analyze)
+ 				appendPQExpBufferStr(sql, " ANALYZE");
+ 		}
+ 	}
+ 
+ 	if (table)
+ 		appendPQExpBuffer(sql, " %s;", table);
+ }
  
  static void
  help(const char *progname)
***************
*** 436,441 **** help(const char *progname)
--- 928,934 ----
  	printf(_("  -V, --version                   output version information, then exit\n"));
  	printf(_("  -z, --analyze                   update optimizer statistics\n"));
  	printf(_("  -Z, --analyze-only              only update optimizer statistics\n"));
+ 	printf(_("  -j, --jobs=NUM                  use this many concurrent connections to vacuum\n"));
  	printf(_("      --analyze-in-stages         only update optimizer statistics, in multiple\n"
  		   "                                  stages for faster results\n"));
  	printf(_("  -?, --help                      show this help, then exit\n"));
***************
*** 449,451 **** help(const char *progname)
--- 942,945 ----
  	printf(_("\nRead the description of the SQL command VACUUM for details.\n"));
  	printf(_("\nReport bugs to <pgsql-bugs@postgresql.org>.\n"));
  }
+ 
