From 1bec0089809f9ba04b95b993caeefff068326c2d Mon Sep 17 00:00:00 2001
From: ThalorMahendra <mahi6run@gmail.com>
Date: Mon, 3 Nov 2025 17:17:09 +0530
Subject: [PATCH] Non text modes for pg_dumpall, correspondingly change 
 pg_restore

    pg_dumpall acquires a new -F/--format option, with the same meanings as
    pg_dump. The default is p, meaning plain text. For any other value, a
    directory is created containing two files, toc.dat/.dmp/.tar and map.dat. The
    first contains commands restoring the global data based on -F, and the second
    contains a map from oids to database names. It will also contain a
    subdirectory called databases, inside which it will create archives in
    the specified format, named using the database oids.

    In these casess the -f argument is required.

    If pg_restore encounters a directory containing map.dat,
    it restores the global settings from toc.dat/.dmp/.tar if exist, and then
    restores each database.

    pg_restore acquires two new options: -g/--globals-only which suppresses
    restoration of any databases, and --exclude-database which inhibits
    restoration of particualr database(s) in the same way the same option
    works in pg_dumpall.

v05
---
 doc/src/sgml/ref/pg_dumpall.sgml     |  89 +++-
 doc/src/sgml/ref/pg_restore.sgml     |  66 ++-
 src/bin/pg_dump/connectdb.c          |   1 -
 src/bin/pg_dump/meson.build          |   1 +
 src/bin/pg_dump/parallel.c           |  10 +
 src/bin/pg_dump/pg_backup.h          |   2 +-
 src/bin/pg_dump/pg_backup_archiver.c |  31 +-
 src/bin/pg_dump/pg_backup_archiver.h |   1 +
 src/bin/pg_dump/pg_backup_tar.c      |   2 +-
 src/bin/pg_dump/pg_dump.c            |   2 +-
 src/bin/pg_dump/pg_dumpall.c         | 608 ++++++++++++++++++++++-----
 src/bin/pg_dump/pg_restore.c         | 593 +++++++++++++++++++++++++-
 src/bin/pg_dump/t/001_basic.pl       |  10 +
 src/bin/pg_dump/t/007_pg_dumpall.pl  | 396 +++++++++++++++++
 14 files changed, 1666 insertions(+), 146 deletions(-)
 mode change 100644 => 100755 src/bin/pg_dump/t/001_basic.pl
 create mode 100755 src/bin/pg_dump/t/007_pg_dumpall.pl

diff --git a/doc/src/sgml/ref/pg_dumpall.sgml b/doc/src/sgml/ref/pg_dumpall.sgml
index 9f639f61db0..4063e88d388 100644
--- a/doc/src/sgml/ref/pg_dumpall.sgml
+++ b/doc/src/sgml/ref/pg_dumpall.sgml
@@ -16,7 +16,10 @@ PostgreSQL documentation
 
  <refnamediv>
   <refname>pg_dumpall</refname>
-  <refpurpose>extract a <productname>PostgreSQL</productname> database cluster into a script file</refpurpose>
+
+  <refpurpose>
+   export a <productname>PostgreSQL</productname> database cluster as an SQL script or to other formats
+  </refpurpose>
  </refnamediv>
 
  <refsynopsisdiv>
@@ -33,7 +36,7 @@ PostgreSQL documentation
   <para>
    <application>pg_dumpall</application> is a utility for writing out
    (<quote>dumping</quote>) all <productname>PostgreSQL</productname> databases
-   of a cluster into one script file.  The script file contains
+   of a cluster into an SQL script file or an archive.  The output contains
    <acronym>SQL</acronym> commands that can be used as input to <xref
    linkend="app-psql"/> to restore the databases.  It does this by
    calling <xref linkend="app-pgdump"/> for each database in the cluster.
@@ -52,11 +55,16 @@ PostgreSQL documentation
   </para>
 
   <para>
-   The SQL script will be written to the standard output.  Use the
+   Plain text SQL scripts will be written to the standard output.  Use the
    <option>-f</option>/<option>--file</option> option or shell operators to
    redirect it into a file.
   </para>
 
+  <para>
+   Archives in other formats will be placed in a directory named using the
+   <option>-f</option>/<option>--file</option>, which is required in this case.
+  </para>
+
   <para>
   <application>pg_dumpall</application> needs to connect several
   times to the <productname>PostgreSQL</productname> server (once per
@@ -131,10 +139,85 @@ PostgreSQL documentation
        <para>
         Send output to the specified file.  If this is omitted, the
         standard output is used.
+        Note: This option can only be omitted when <option>--format</option> is plain
        </para>
       </listitem>
      </varlistentry>
 
+     <varlistentry>
+      <term><option>-F <replaceable class="parameter">format</replaceable></option></term>
+      <term><option>--format=<replaceable class="parameter">format</replaceable></option></term>
+      <listitem>
+       <para>
+        Specify the format of dump files.  In plain format, all the dump data is
+        sent in a single text stream. This is the default.
+
+        In all other modes, <application>pg_dumpall</application> first creates two files:
+        <filename>toc.dat/toc.dmp/toc.tar</filename> and <filename>map.dat</filename>, in the directory
+        specified by <option>--file</option>.
+        The first file contains global data, such as roles and tablespaces. The second
+        contains a mapping between database oids and names. These files are used by
+        <application>pg_restore</application>. Data for individual databases is placed in
+        <filename>databases</filename> subdirectory, named using the database's <type>oid</type>.
+
+       <variablelist>
+        <varlistentry>
+         <term><literal>d</literal></term>
+         <term><literal>directory</literal></term>
+         <listitem>
+          <para>
+           Output directory-format archives for each database,
+           suitable for input into pg_restore. The directory
+           will have database <type>oid</type> as its name.
+          </para>
+         </listitem>
+        </varlistentry>
+
+        <varlistentry>
+         <term><literal>p</literal></term>
+         <term><literal>plain</literal></term>
+         <listitem>
+          <para>
+           Output a plain-text SQL script file (the default).
+          </para>
+         </listitem>
+        </varlistentry>
+
+        <varlistentry>
+         <term><literal>c</literal></term>
+         <term><literal>custom</literal></term>
+         <listitem>
+          <para>
+           Output a custom-format archive for each database,
+           suitable for input into pg_restore. The archive
+           will be named <filename>dboid.dmp</filename> where <type>dboid</type> is the
+           <type>oid</type> of the database.
+          </para>
+         </listitem>
+        </varlistentry>
+
+         <varlistentry>
+         <term><literal>t</literal></term>
+         <term><literal>tar</literal></term>
+         <listitem>
+          <para>
+           Output a tar-format archive for each database,
+           suitable for input into pg_restore. The archive
+           will be named <filename>dboid.tar</filename> where <type>dboid</type> is the
+           <type>oid</type> of the database.
+          </para>
+         </listitem>
+        </varlistentry>
+
+        </variablelist>
+
+       Note: see <xref linkend="app-pgdump"/> for details
+       of how the various non plain text archives work.
+
+        </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry>
       <term><option>-g</option></term>
       <term><option>--globals-only</option></term>
diff --git a/doc/src/sgml/ref/pg_restore.sgml b/doc/src/sgml/ref/pg_restore.sgml
index a468a38361a..7497b527ae6 100644
--- a/doc/src/sgml/ref/pg_restore.sgml
+++ b/doc/src/sgml/ref/pg_restore.sgml
@@ -18,8 +18,9 @@ PostgreSQL documentation
   <refname>pg_restore</refname>
 
   <refpurpose>
-   restore a <productname>PostgreSQL</productname> database from an
-   archive file created by <application>pg_dump</application>
+   restore <productname>PostgreSQL</productname> databases from archives
+   created by <application>pg_dump</application> or
+   <application>pg_dumpall</application>
   </refpurpose>
  </refnamediv>
 
@@ -38,13 +39,14 @@ PostgreSQL documentation
 
   <para>
    <application>pg_restore</application> is a utility for restoring a
-   <productname>PostgreSQL</productname> database from an archive
-   created by <xref linkend="app-pgdump"/> in one of the non-plain-text
+   <productname>PostgreSQL</productname> database or cluster from an archive
+   created by <xref linkend="app-pgdump"/> or
+   <xref linkend="app-pg-dumpall"/> in one of the non-plain-text
    formats.  It will issue the commands necessary to reconstruct the
-   database to the state it was in at the time it was saved.  The
-   archive files also allow <application>pg_restore</application> to
+   database or cluster to the state it was in at the time it was saved. The
+   archives also allow <application>pg_restore</application> to
    be selective about what is restored, or even to reorder the items
-   prior to being restored. The archive files are designed to be
+   prior to being restored. The archive formats are designed to be
    portable across architectures.
   </para>
 
@@ -52,10 +54,17 @@ PostgreSQL documentation
    <application>pg_restore</application> can operate in two modes.
    If a database name is specified, <application>pg_restore</application>
    connects to that database and restores archive contents directly into
-   the database.  Otherwise, a script containing the SQL
-   commands necessary to rebuild the database is created and written
+   the database.
+   When restoring from a dump made by <application>pg_dumpall</application>,
+   each database will be created and then the restoration will be run in that
+   database.
+
+   Otherwise, when a database name is not specified, a script containing the SQL
+   commands necessary to rebuild the database or cluster is created and written
    to a file or standard output.  This script output is equivalent to
-   the plain text output format of <application>pg_dump</application>.
+   the plain text output format of <application>pg_dump</application> or
+   <application>pg_dumpall</application>.
+
    Some of the options controlling the output are therefore analogous to
    <application>pg_dump</application> options.
   </para>
@@ -152,6 +161,8 @@ PostgreSQL documentation
         commands that mention this database.
         Access privileges for the database itself are also restored,
         unless <option>--no-acl</option> is specified.
+        <option>--create</option> is required when restoring multiple databases
+        from an archive created by <application>pg_dumpall</application>.
        </para>
 
        <para>
@@ -247,6 +258,19 @@ PostgreSQL documentation
       </listitem>
      </varlistentry>
 
+     <varlistentry>
+      <term><option>-g</option></term>
+      <term><option>--globals-only</option></term>
+      <listitem>
+       <para>
+        Restore only global objects (roles and tablespaces), no databases.
+       </para>
+       <para>
+        This option is only relevant when restoring from an archive made using <application>pg_dumpall</application>.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry>
       <term><option>-I <replaceable class="parameter">index</replaceable></option></term>
       <term><option>--index=<replaceable class="parameter">index</replaceable></option></term>
@@ -591,6 +615,28 @@ PostgreSQL documentation
       </listitem>
      </varlistentry>
 
+     <varlistentry>
+      <term><option>--exclude-database=<replaceable class="parameter">pattern</replaceable></option></term>
+      <listitem>
+       <para>
+        Do not restore databases whose name matches
+        <replaceable class="parameter">pattern</replaceable>.
+        Multiple patterns can be excluded by writing multiple
+        <option>--exclude-database</option> switches.  The
+        <replaceable class="parameter">pattern</replaceable> parameter is
+        interpreted as a pattern according to the same rules used by
+        <application>psql</application>'s <literal>\d</literal>
+        commands (see <xref linkend="app-psql-patterns"/>),
+        so multiple databases can also be excluded by writing wildcard
+        characters in the pattern.  When using wildcards, be careful to
+        quote the pattern if needed to prevent shell wildcard expansion.
+       </para>
+       <para>
+        This option is only relevant when restoring from an archive made using <application>pg_dumpall</application>.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry>
       <term><option>--filter=<replaceable class="parameter">filename</replaceable></option></term>
       <listitem>
diff --git a/src/bin/pg_dump/connectdb.c b/src/bin/pg_dump/connectdb.c
index d55d53dbeea..f44a8a45fca 100644
--- a/src/bin/pg_dump/connectdb.c
+++ b/src/bin/pg_dump/connectdb.c
@@ -287,7 +287,6 @@ executeQuery(PGconn *conn, const char *query)
 	{
 		pg_log_error("query failed: %s", PQerrorMessage(conn));
 		pg_log_error_detail("Query was: %s", query);
-		PQfinish(conn);
 		exit_nicely(1);
 	}
 
diff --git a/src/bin/pg_dump/meson.build b/src/bin/pg_dump/meson.build
index f3c669f484e..3e21aaf5780 100644
--- a/src/bin/pg_dump/meson.build
+++ b/src/bin/pg_dump/meson.build
@@ -103,6 +103,7 @@ tests += {
       't/004_pg_dump_parallel.pl',
       't/005_pg_dump_filterfile.pl',
       't/006_pg_dump_compress.pl',
+	  't/007_pg_dumpall.pl',
       't/010_dump_connstr.pl',
     ],
   },
diff --git a/src/bin/pg_dump/parallel.c b/src/bin/pg_dump/parallel.c
index 086adcdc502..5974d6706fd 100644
--- a/src/bin/pg_dump/parallel.c
+++ b/src/bin/pg_dump/parallel.c
@@ -333,6 +333,16 @@ on_exit_close_archive(Archive *AHX)
 	on_exit_nicely(archive_close_connection, &shutdown_info);
 }
 
+/*
+ * When pg_restore restores multiple databases, then update already added entry
+ * into array for cleanup.
+ */
+void
+replace_on_exit_close_archive(Archive *AHX)
+{
+	shutdown_info.AHX = AHX;
+}
+
 /*
  * on_exit_nicely handler for shutting down database connections and
  * worker processes cleanly.
diff --git a/src/bin/pg_dump/pg_backup.h b/src/bin/pg_dump/pg_backup.h
index d9041dad720..f631d945472 100644
--- a/src/bin/pg_dump/pg_backup.h
+++ b/src/bin/pg_dump/pg_backup.h
@@ -312,7 +312,7 @@ extern void SetArchiveOptions(Archive *AH, DumpOptions *dopt, RestoreOptions *ro
 
 extern void ProcessArchiveRestoreOptions(Archive *AHX);
 
-extern void RestoreArchive(Archive *AHX);
+extern void RestoreArchive(Archive *AHX, bool append_data, bool globals_only);
 
 /* Open an existing archive */
 extern Archive *OpenArchive(const char *FileSpec, const ArchiveFormat fmt);
diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index 59eaecb4ed7..e4cfa9a963a 100644
--- a/src/bin/pg_dump/pg_backup_archiver.c
+++ b/src/bin/pg_dump/pg_backup_archiver.c
@@ -86,7 +86,7 @@ static int	RestoringToDB(ArchiveHandle *AH);
 static void dump_lo_buf(ArchiveHandle *AH);
 static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim);
 static void SetOutput(ArchiveHandle *AH, const char *filename,
-					  const pg_compress_specification compression_spec);
+					  const pg_compress_specification compression_spec, bool append_data);
 static CompressFileHandle *SaveOutput(ArchiveHandle *AH);
 static void RestoreOutput(ArchiveHandle *AH, CompressFileHandle *savedOutput);
 
@@ -339,9 +339,14 @@ ProcessArchiveRestoreOptions(Archive *AHX)
 		StrictNamesCheck(ropt);
 }
 
-/* Public */
+/*
+ * RestoreArchive
+ *
+ * If append_data is set, then append data into file as we are restoring dump
+ * of multiple databases which was taken by pg_dumpall.
+ */
 void
-RestoreArchive(Archive *AHX)
+RestoreArchive(Archive *AHX, bool append_data, bool globals_only)
 {
 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
 	RestoreOptions *ropt = AH->public.ropt;
@@ -458,7 +463,7 @@ RestoreArchive(Archive *AHX)
 	 */
 	sav = SaveOutput(AH);
 	if (ropt->filename || ropt->compression_spec.algorithm != PG_COMPRESSION_NONE)
-		SetOutput(AH, ropt->filename, ropt->compression_spec);
+		SetOutput(AH, ropt->filename, ropt->compression_spec, append_data);
 
 	ahprintf(AH, "--\n-- PostgreSQL database dump\n--\n\n");
 
@@ -761,6 +766,17 @@ RestoreArchive(Archive *AHX)
 			if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS)) == 0)
 				continue;		/* ignore if not to be dumped at all */
 
+			/* Skip DROP DATABASE if globals_only. */
+			if (globals_only && te && te->tag && (strcmp(te->tag, "DROP_DATABASE") == 0))
+				continue;
+
+			/* Skip for RESTRICT, UNRESTRICT, CONNECT. */
+			if (!ropt->filename && te && te->tag &&
+					((strcmp(te->tag, "RESTRICT") == 0) ||
+					 (strcmp(te->tag, "UNRESTRICT") == 0) ||
+					 (strcmp(te->tag, "CONNECT") == 0)))
+				continue;
+
 			switch (_tocEntryRestorePass(te))
 			{
 				case RESTORE_PASS_MAIN:
@@ -1316,7 +1332,7 @@ PrintTOCSummary(Archive *AHX)
 
 	sav = SaveOutput(AH);
 	if (ropt->filename)
-		SetOutput(AH, ropt->filename, out_compression_spec);
+		SetOutput(AH, ropt->filename, out_compression_spec, false);
 
 	if (strftime(stamp_str, sizeof(stamp_str), PGDUMP_STRFTIME_FMT,
 				 localtime(&AH->createDate)) == 0)
@@ -1695,7 +1711,8 @@ archprintf(Archive *AH, const char *fmt,...)
 
 static void
 SetOutput(ArchiveHandle *AH, const char *filename,
-		  const pg_compress_specification compression_spec)
+		  const pg_compress_specification compression_spec,
+		  bool append_data)
 {
 	CompressFileHandle *CFH;
 	const char *mode;
@@ -1715,7 +1732,7 @@ SetOutput(ArchiveHandle *AH, const char *filename,
 	else
 		fn = fileno(stdout);
 
-	if (AH->mode == archModeAppend)
+	if (append_data || AH->mode == archModeAppend)
 		mode = PG_BINARY_A;
 	else
 		mode = PG_BINARY_W;
diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h
index 325b53fc9bd..365073b3eae 100644
--- a/src/bin/pg_dump/pg_backup_archiver.h
+++ b/src/bin/pg_dump/pg_backup_archiver.h
@@ -394,6 +394,7 @@ struct _tocEntry
 
 extern int	parallel_restore(ArchiveHandle *AH, TocEntry *te);
 extern void on_exit_close_archive(Archive *AHX);
+extern void replace_on_exit_close_archive(Archive *AHX);
 
 extern void warn_or_exit_horribly(ArchiveHandle *AH, const char *fmt,...) pg_attribute_printf(2, 3);
 
diff --git a/src/bin/pg_dump/pg_backup_tar.c b/src/bin/pg_dump/pg_backup_tar.c
index b5ba3b46dd9..818b80a9369 100644
--- a/src/bin/pg_dump/pg_backup_tar.c
+++ b/src/bin/pg_dump/pg_backup_tar.c
@@ -826,7 +826,7 @@ _CloseArchive(ArchiveHandle *AH)
 		savVerbose = AH->public.verbose;
 		AH->public.verbose = 0;
 
-		RestoreArchive((Archive *) AH);
+		RestoreArchive((Archive *) AH, false, false);
 
 		SetArchiveOptions((Archive *) AH, savDopt, savRopt);
 
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 47913178a93..00ce946aab1 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -1292,7 +1292,7 @@ main(int argc, char **argv)
 	 * right now.
 	 */
 	if (plainText)
-		RestoreArchive(fout);
+		RestoreArchive(fout, false, false);
 
 	CloseArchive(fout);
 
diff --git a/src/bin/pg_dump/pg_dumpall.c b/src/bin/pg_dump/pg_dumpall.c
index bb451c1bae1..601b9f9738e 100644
--- a/src/bin/pg_dump/pg_dumpall.c
+++ b/src/bin/pg_dump/pg_dumpall.c
@@ -30,6 +30,7 @@
 #include "fe_utils/string_utils.h"
 #include "filter.h"
 #include "getopt_long.h"
+#include "pg_backup_archiver.h"
 
 /* version string we expect back from pg_dump */
 #define PGDUMP_VERSIONSTR "pg_dump (PostgreSQL) " PG_VERSION "\n"
@@ -65,9 +66,10 @@ static void dropTablespaces(PGconn *conn);
 static void dumpTablespaces(PGconn *conn);
 static void dropDBs(PGconn *conn);
 static void dumpUserConfig(PGconn *conn, const char *username);
-static void dumpDatabases(PGconn *conn);
+static void dumpDatabases(PGconn *conn, ArchiveFormat archDumpFormat);
 static void dumpTimestamp(const char *msg);
-static int	runPgDump(const char *dbname, const char *create_opts);
+static int	runPgDump(const char *dbname, const char *create_opts,
+					  char *dbfile, ArchiveFormat archDumpFormat);
 static void buildShSecLabels(PGconn *conn,
 							 const char *catalog_name, Oid objectId,
 							 const char *objtype, const char *objname,
@@ -76,6 +78,9 @@ static void executeCommand(PGconn *conn, const char *query);
 static void expand_dbname_patterns(PGconn *conn, SimpleStringList *patterns,
 								   SimpleStringList *names);
 static void read_dumpall_filters(const char *filename, SimpleStringList *pattern);
+static ArchiveFormat parseDumpFormat(const char *format);
+static int createDumpId(void);
+static void createOneArchiveEntry(const char *query, const char *tag);
 
 static char pg_dump_bin[MAXPGPATH];
 static PQExpBuffer pgdumpopts;
@@ -123,6 +128,13 @@ static SimpleStringList database_exclude_patterns = {NULL, NULL};
 static SimpleStringList database_exclude_names = {NULL, NULL};
 
 static char *restrict_key;
+static Archive *fout = NULL;
+static pg_compress_specification compression_spec = {0};
+static int dumpIdVal = 0;
+static const CatalogId nilCatalogId = {0, 0};
+static ArchiveMode archiveMode = archModeWrite;
+static DataDirSyncMethod sync_method = DATA_DIR_SYNC_METHOD_FSYNC;
+static ArchiveFormat archDumpFormat = archNull;
 
 int
 main(int argc, char *argv[])
@@ -148,6 +160,7 @@ main(int argc, char *argv[])
 		{"password", no_argument, NULL, 'W'},
 		{"no-privileges", no_argument, NULL, 'x'},
 		{"no-acl", no_argument, NULL, 'x'},
+		{"format", required_argument, NULL, 'F'},
 
 		/*
 		 * the following options don't have an equivalent short option letter
@@ -197,6 +210,7 @@ main(int argc, char *argv[])
 	char	   *pgdb = NULL;
 	char	   *use_role = NULL;
 	const char *dumpencoding = NULL;
+	const char *formatName = "p";
 	trivalue	prompt_password = TRI_DEFAULT;
 	bool		data_only = false;
 	bool		globals_only = false;
@@ -208,6 +222,8 @@ main(int argc, char *argv[])
 	int			c,
 				ret;
 	int			optindex;
+	DumpOptions dopt;
+	char        global_path[MAXPGPATH];
 
 	pg_logging_init(argv[0]);
 	pg_logging_set_level(PG_LOG_WARNING);
@@ -246,7 +262,9 @@ main(int argc, char *argv[])
 
 	pgdumpopts = createPQExpBuffer();
 
-	while ((c = getopt_long(argc, argv, "acd:E:f:gh:l:Op:rsS:tU:vwWx", long_options, &optindex)) != -1)
+	InitDumpOptions(&dopt);
+
+	while ((c = getopt_long(argc, argv, "acd:E:f:F:gh:l:Op:rsS:tU:vwWx", long_options, &optindex)) != -1)
 	{
 		switch (c)
 		{
@@ -257,6 +275,7 @@ main(int argc, char *argv[])
 
 			case 'c':
 				output_clean = true;
+				dopt.outputClean = 1;
 				break;
 
 			case 'd':
@@ -274,7 +293,9 @@ main(int argc, char *argv[])
 				appendPQExpBufferStr(pgdumpopts, " -f ");
 				appendShellString(pgdumpopts, filename);
 				break;
-
+			case 'F':
+				formatName = pg_strdup(optarg);
+				break;
 			case 'g':
 				globals_only = true;
 				break;
@@ -314,6 +335,7 @@ main(int argc, char *argv[])
 
 			case 'U':
 				pguser = pg_strdup(optarg);
+				dopt.cparams.username = pg_strdup(optarg);
 				break;
 
 			case 'v':
@@ -429,6 +451,21 @@ main(int argc, char *argv[])
 		exit_nicely(1);
 	}
 
+	/* Get format for dump. */
+	archDumpFormat = parseDumpFormat(formatName);
+
+	/*
+	 * If a non-plain format is specified, a file name is also required as the
+	 * path to the main directory.
+	 */
+	if (archDumpFormat != archNull &&
+		(!filename || strcmp(filename, "") == 0))
+	{
+		pg_log_error("option -F/--format=d|c|t requires option -f/--file");
+		pg_log_error_hint("Try \"%s --help\" for more information.", progname);
+		exit_nicely(1);
+	}
+
 	/*
 	 * If password values are not required in the dump, switch to using
 	 * pg_roles which is equally useful, just more likely to have unrestricted
@@ -489,6 +526,35 @@ main(int argc, char *argv[])
 	if (sequence_data)
 		appendPQExpBufferStr(pgdumpopts, " --sequence-data");
 
+	/*
+	 * Open the output file if required, otherwise use stdout.  If required,
+	 * then create new directory.
+	 */
+	if (archDumpFormat != archNull)
+	{
+		Assert(filename);
+
+		/* Create new directory or accept the empty existing directory. */
+		create_or_open_dir(filename);
+
+		/* set file path for global sql commands. */
+		if (archDumpFormat == archCustom)
+			snprintf(global_path, MAXPGPATH, "%s/toc.dmp", filename);
+		else if (archDumpFormat == archTar)
+			snprintf(global_path, MAXPGPATH, "%s/toc.tar", filename);
+		else if (archDumpFormat == archDirectory)
+			snprintf(global_path, MAXPGPATH, "%s", filename);
+	}
+	else if (filename)
+	{
+		OPF = fopen(filename, PG_BINARY_W);
+		if (!OPF)
+			pg_fatal("could not open output file \"%s\": %m",
+					 filename);
+	}
+	else
+		OPF = stdout;
+
 	/*
 	 * If you don't provide a restrict key, one will be appointed for you.
 	 */
@@ -538,19 +604,6 @@ main(int argc, char *argv[])
 	expand_dbname_patterns(conn, &database_exclude_patterns,
 						   &database_exclude_names);
 
-	/*
-	 * Open the output file if required, otherwise use stdout
-	 */
-	if (filename)
-	{
-		OPF = fopen(filename, PG_BINARY_W);
-		if (!OPF)
-			pg_fatal("could not open output file \"%s\": %m",
-					 filename);
-	}
-	else
-		OPF = stdout;
-
 	/*
 	 * Set the client encoding if requested.
 	 */
@@ -585,37 +638,115 @@ main(int argc, char *argv[])
 	if (quote_all_identifiers)
 		executeCommand(conn, "SET quote_all_identifiers = true");
 
-	fprintf(OPF, "--\n-- PostgreSQL database cluster dump\n--\n\n");
 	if (verbose)
 		dumpTimestamp("Started on");
 
-	/*
-	 * Enter restricted mode to block any unexpected psql meta-commands.  A
-	 * malicious source might try to inject a variety of things via bogus
-	 * responses to queries.  While we cannot prevent such sources from
-	 * affecting the destination at restore time, we can block psql
-	 * meta-commands so that the client machine that runs psql with the dump
-	 * output remains unaffected.
-	 */
-	fprintf(OPF, "\\restrict %s\n\n", restrict_key);
+	/* create a archive file for global commands. */
+	if (filename && archDumpFormat != archNull)
+	{
+		/* Open the output file */
+		fout = CreateArchive(global_path, archDumpFormat, compression_spec,
+				dosync, archiveMode, NULL, sync_method);
 
-	/*
-	 * We used to emit \connect postgres here, but that served no purpose
-	 * other than to break things for installations without a postgres
-	 * database.  Everything we're restoring here is a global, so whichever
-	 * database we're connected to at the moment is fine.
-	 */
+		/* Make dump options accessible right away */
+		SetArchiveOptions(fout, &dopt, NULL);
+
+		((ArchiveHandle * )fout) ->connection = conn;
+		((ArchiveHandle * ) fout) -> public.numWorkers = 1;
+
+		/* Register the cleanup hook */
+		on_exit_close_archive(fout);
+
+		/* Let the archiver know how noisy to be */
+		fout->verbose = verbose;
+
+		/*
+		 * We allow the server to be back to 9.2, and up to any minor release of
+		 * our own major version.  (See also version check in pg_dumpall.c.)
+		 */
+		fout->minRemoteVersion = 90200;
+		fout->maxRemoteVersion = (PG_VERSION_NUM / 100) * 100 + 99;
+		fout->numWorkers = 1;
+
+		createOneArchiveEntry("--\n-- PostgreSQL database cluster dump\n--\n\n", "COMMENT");
+
+		/* create entry for restrict */
+		{
+			PQExpBuffer qry = createPQExpBuffer();
 
-	/* Restore will need to write to the target cluster */
-	fprintf(OPF, "SET default_transaction_read_only = off;\n\n");
+			appendPQExpBuffer(qry, "\\restrict %s\n\n", restrict_key);
+			createOneArchiveEntry(qry->data, "RESTRICT");
+			destroyPQExpBuffer(qry);
+		}
+
+		/* default_transaction_read_only = off */
+		{
+			PQExpBuffer qry = createPQExpBuffer();
+
+			pg_log_info("saving default_transaction_read_only = off");
+			appendPQExpBuffer(qry, "SET default_transaction_read_only = off;\n");
+			createOneArchiveEntry(qry->data, "DEFAULT_TRANSACTION_READ_ONLY");
+			destroyPQExpBuffer(qry);
+		}
+
+		/* dumpEncoding: put the correct encoding into the archive */
+		{
+			PQExpBuffer qry = createPQExpBuffer();
+			const char *encname = pg_encoding_to_char(encoding);
 
-	/* Replicate encoding and std_strings in output */
-	fprintf(OPF, "SET client_encoding = '%s';\n",
-			pg_encoding_to_char(encoding));
-	fprintf(OPF, "SET standard_conforming_strings = %s;\n", std_strings);
-	if (strcmp(std_strings, "off") == 0)
-		fprintf(OPF, "SET escape_string_warning = off;\n");
-	fprintf(OPF, "\n");
+			appendPQExpBufferStr(qry, "SET client_encoding = ");
+			appendStringLiteralAH(qry, encname, fout);
+			appendPQExpBufferStr(qry, ";\n");
+
+			pg_log_info("saving encoding = %s", encname);
+			createOneArchiveEntry(qry->data, "ENCODING");
+			destroyPQExpBuffer(qry);
+		}
+
+		/* dumpStdStrings: put the correct escape string behavior into the archive */
+		{
+			const char *stdstrings = std_strings ? "on" : "off";
+			PQExpBuffer qry = createPQExpBuffer();
+
+			pg_log_info("saving \"standard_conforming_strings = %s\"", stdstrings);
+			appendPQExpBuffer(qry, "SET standard_conforming_strings = '%s';\n",
+					stdstrings);
+			createOneArchiveEntry(qry->data, "STDSTRINGS");
+			destroyPQExpBuffer(qry);
+		}
+	}
+	else
+	{
+		fprintf(OPF, "--\n-- PostgreSQL database cluster dump\n--\n\n");
+
+		/*
+		 * Enter restricted mode to block any unexpected psql meta-commands.  A
+		 * malicious source might try to inject a variety of things via bogus
+		 * responses to queries.  While we cannot prevent such sources from
+		 * affecting the destination at restore time, we can block psql
+		 * meta-commands so that the client machine that runs psql with the dump
+		 * output remains unaffected.
+		 */
+		fprintf(OPF, "\\restrict %s\n\n", restrict_key);
+
+		/*
+		 * We used to emit \connect postgres here, but that served no purpose
+		 * other than to break things for installations without a postgres
+		 * database.  Everything we're restoring here is a global, so whichever
+		 * database we're connected to at the moment is fine.
+		 */
+
+		/* Restore will need to write to the target cluster */
+		fprintf(OPF, "SET default_transaction_read_only = off;\n\n");
+
+		/* Replicate encoding and std_strings in output */
+		fprintf(OPF, "SET client_encoding = '%s';\n",
+				pg_encoding_to_char(encoding));
+		fprintf(OPF, "SET standard_conforming_strings = %s;\n", std_strings);
+		if (strcmp(std_strings, "off") == 0)
+			fprintf(OPF, "SET escape_string_warning = off;\n");
+		fprintf(OPF, "\n");
+	}
 
 	if (!data_only && !statistics_only && !no_schema)
 	{
@@ -659,27 +790,51 @@ main(int argc, char *argv[])
 			dumpTablespaces(conn);
 	}
 
-	/*
-	 * Exit restricted mode just before dumping the databases.  pg_dump will
-	 * handle entering restricted mode again as appropriate.
-	 */
-	fprintf(OPF, "\\unrestrict %s\n\n", restrict_key);
+	if (archDumpFormat == archNull)
+	{
+		/*
+		 * Exit restricted mode just before dumping the databases.  pg_dump will
+		 * handle entering restricted mode again as appropriate.
+		 */
+		fprintf(OPF, "\\unrestrict %s\n\n", restrict_key);
+	}
+	else
+	{
+		/* create entry for unrestrict */
+		PQExpBuffer qry = createPQExpBuffer();
 
-	if (!globals_only && !roles_only && !tablespaces_only)
-		dumpDatabases(conn);
+		appendPQExpBuffer(qry, "\\unrestrict %s\n\n", restrict_key);
+		createOneArchiveEntry(qry->data, "UNRESTRICT");
+		destroyPQExpBuffer(qry);
+	}
 
-	PQfinish(conn);
+	if (!globals_only && !roles_only && !tablespaces_only)
+		dumpDatabases(conn, archDumpFormat);
 
 	if (verbose)
 		dumpTimestamp("Completed on");
-	fprintf(OPF, "--\n-- PostgreSQL database cluster dump complete\n--\n\n");
 
-	if (filename)
+	if (archDumpFormat == archNull)
+		fprintf(OPF, "--\n-- PostgreSQL database cluster dump complete\n--\n\n");
+
+	if (archDumpFormat != archNull)
+	{
+		RestoreOptions *ropt;
+
+		createOneArchiveEntry("--\n-- PostgreSQL database cluster dump complete\n--\n\n", "COMMENT");
+		ropt = NewRestoreOptions();
+		SetArchiveOptions(fout, &dopt, ropt);
+
+		/* Mark which entries should be output */
+		ProcessArchiveRestoreOptions(fout);
+		CloseArchive(fout);
+	}
+	else if (filename)
 	{
 		fclose(OPF);
 
 		/* sync the resulting file, errors are not fatal */
-		if (dosync)
+		if (dosync && (archDumpFormat == archNull))
 			(void) fsync_fname(filename, false);
 	}
 
@@ -690,12 +845,14 @@ main(int argc, char *argv[])
 static void
 help(void)
 {
-	printf(_("%s exports a PostgreSQL database cluster as an SQL script.\n\n"), progname);
+	printf(_("%s exports a PostgreSQL database cluster as an SQL script or to other formats.\n\n"), progname);
 	printf(_("Usage:\n"));
 	printf(_("  %s [OPTION]...\n"), progname);
 
 	printf(_("\nGeneral options:\n"));
 	printf(_("  -f, --file=FILENAME          output file name\n"));
+	printf(_("  -F, --format=c|d|t|p         output file format (custom, directory, tar,\n"
+			 "                               plain text (default))\n"));
 	printf(_("  -v, --verbose                verbose mode\n"));
 	printf(_("  -V, --version                output version information, then exit\n"));
 	printf(_("  --lock-wait-timeout=TIMEOUT  fail after waiting TIMEOUT for a table lock\n"));
@@ -770,6 +927,7 @@ static void
 dropRoles(PGconn *conn)
 {
 	PQExpBuffer buf = createPQExpBuffer();
+	PQExpBuffer delQry = createPQExpBuffer();
 	PGresult   *res;
 	int			i_rolname;
 	int			i;
@@ -791,7 +949,12 @@ dropRoles(PGconn *conn)
 	i_rolname = PQfnumber(res, "rolname");
 
 	if (PQntuples(res) > 0)
-		fprintf(OPF, "--\n-- Drop roles\n--\n\n");
+	{
+		if (archDumpFormat == archNull)
+			fprintf(OPF, "--\n-- Drop roles\n--\n\n");
+		else
+			createOneArchiveEntry("--\n-- Drop roles\n--\n\n", "COMMENT");
+	}
 
 	for (i = 0; i < PQntuples(res); i++)
 	{
@@ -799,15 +962,21 @@ dropRoles(PGconn *conn)
 
 		rolename = PQgetvalue(res, i, i_rolname);
 
-		fprintf(OPF, "DROP ROLE %s%s;\n",
+		appendPQExpBuffer(delQry, "DROP ROLE %s%s;\n",
 				if_exists ? "IF EXISTS " : "",
 				fmtId(rolename));
+
+		if (archDumpFormat == archNull)
+			fprintf(OPF, "%s", delQry->data);
+		else
+			createOneArchiveEntry(delQry->data, "dropRoles");
 	}
 
 	PQclear(res);
 	destroyPQExpBuffer(buf);
 
-	fprintf(OPF, "\n\n");
+	if (archDumpFormat == archNull)
+		fprintf(OPF, "\n\n");
 }
 
 /*
@@ -889,7 +1058,12 @@ dumpRoles(PGconn *conn)
 	i_is_current_user = PQfnumber(res, "is_current_user");
 
 	if (PQntuples(res) > 0)
-		fprintf(OPF, "--\n-- Roles\n--\n\n");
+	{
+		if (archDumpFormat == archNull)
+			fprintf(OPF, "--\n-- Roles\n--\n\n");
+		else
+			createOneArchiveEntry("--\n-- Roles\n--\n\n", "COMMENT");
+	}
 
 	for (i = 0; i < PQntuples(res); i++)
 	{
@@ -993,7 +1167,10 @@ dumpRoles(PGconn *conn)
 							 "ROLE", rolename,
 							 buf);
 
-		fprintf(OPF, "%s", buf->data);
+		if (archDumpFormat == archNull)
+			fprintf(OPF, "%s", buf->data);
+		else
+			createOneArchiveEntry(buf->data, "dumpRoles");
 	}
 
 	/*
@@ -1001,15 +1178,13 @@ dumpRoles(PGconn *conn)
 	 * We do it this way because config settings for roles could mention the
 	 * names of other roles.
 	 */
-	if (PQntuples(res) > 0)
-		fprintf(OPF, "\n--\n-- User Configurations\n--\n");
-
 	for (i = 0; i < PQntuples(res); i++)
 		dumpUserConfig(conn, PQgetvalue(res, i, i_rolname));
 
 	PQclear(res);
 
-	fprintf(OPF, "\n\n");
+	if (archDumpFormat == archNull)
+		fprintf(OPF, "\n\n");
 
 	destroyPQExpBuffer(buf);
 }
@@ -1088,7 +1263,12 @@ dumpRoleMembership(PGconn *conn)
 	i_set_option = PQfnumber(res, "set_option");
 
 	if (PQntuples(res) > 0)
-		fprintf(OPF, "--\n-- Role memberships\n--\n\n");
+	{
+		if (archDumpFormat == archNull)
+			fprintf(OPF, "--\n-- Role memberships\n--\n\n");
+		else
+			createOneArchiveEntry("--\n-- Role memberships\n--\n\n", "COMMENT");
+	}
 
 	/*
 	 * We can't dump these GRANT commands in arbitrary order, because a role
@@ -1167,6 +1347,7 @@ dumpRoleMembership(PGconn *conn)
 				char	   *grantor;
 				char	   *set_option = "true";
 				bool		found;
+				PQExpBuffer creaQry = createPQExpBuffer();
 
 				/* If we already did this grant, don't do it again. */
 				if (done[i - start])
@@ -1223,8 +1404,8 @@ dumpRoleMembership(PGconn *conn)
 
 				/* Generate the actual GRANT statement. */
 				resetPQExpBuffer(optbuf);
-				fprintf(OPF, "GRANT %s", fmtId(role));
-				fprintf(OPF, " TO %s", fmtId(member));
+				appendPQExpBuffer(creaQry, "GRANT %s", fmtId(role));
+				appendPQExpBuffer(creaQry, " TO %s", fmtId(member));
 				if (*admin_option == 't')
 					appendPQExpBufferStr(optbuf, "ADMIN OPTION");
 				if (dump_grant_options)
@@ -1245,10 +1426,15 @@ dumpRoleMembership(PGconn *conn)
 					appendPQExpBufferStr(optbuf, "SET FALSE");
 				}
 				if (optbuf->data[0] != '\0')
-					fprintf(OPF, " WITH %s", optbuf->data);
+					appendPQExpBuffer(creaQry, " WITH %s", optbuf->data);
 				if (dump_grantors)
-					fprintf(OPF, " GRANTED BY %s", fmtId(grantor));
-				fprintf(OPF, ";\n");
+					appendPQExpBuffer(creaQry, " GRANTED BY %s", fmtId(grantor));
+				appendPQExpBuffer(creaQry, ";\n");
+
+				if (archDumpFormat == archNull)
+					fprintf(OPF, "%s", creaQry->data);
+				else
+					createOneArchiveEntry(creaQry->data, "dumpRoleMembership");
 			}
 		}
 
@@ -1260,7 +1446,8 @@ dumpRoleMembership(PGconn *conn)
 	PQclear(res);
 	destroyPQExpBuffer(buf);
 
-	fprintf(OPF, "\n\n");
+	if (archDumpFormat == archNull)
+		fprintf(OPF, "\n\n");
 }
 
 
@@ -1288,7 +1475,12 @@ dumpRoleGUCPrivs(PGconn *conn)
 					   "ORDER BY 1");
 
 	if (PQntuples(res) > 0)
-		fprintf(OPF, "--\n-- Role privileges on configuration parameters\n--\n\n");
+	{
+		if (archDumpFormat == archNull)
+			fprintf(OPF, "--\n-- Role privileges on configuration parameters\n--\n\n");
+		else
+			createOneArchiveEntry("--\n-- Role privileges on configuration parameters\n--\n\n", "COMMENT");
+	}
 
 	for (i = 0; i < PQntuples(res); i++)
 	{
@@ -1312,14 +1504,19 @@ dumpRoleGUCPrivs(PGconn *conn)
 			exit_nicely(1);
 		}
 
-		fprintf(OPF, "%s", buf->data);
+		if (archDumpFormat == archNull)
+			fprintf(OPF, "%s", buf->data);
+		else
+			createOneArchiveEntry(buf->data, "dumpRoleGUCPrivs");
 
 		free(fparname);
 		destroyPQExpBuffer(buf);
 	}
 
 	PQclear(res);
-	fprintf(OPF, "\n\n");
+
+	if (archDumpFormat == archNull)
+		fprintf(OPF, "\n\n");
 }
 
 
@@ -1331,6 +1528,7 @@ dropTablespaces(PGconn *conn)
 {
 	PGresult   *res;
 	int			i;
+	PQExpBuffer delQry = createPQExpBuffer();
 
 	/*
 	 * Get all tablespaces except built-in ones (which we assume are named
@@ -1342,20 +1540,31 @@ dropTablespaces(PGconn *conn)
 					   "ORDER BY 1");
 
 	if (PQntuples(res) > 0)
-		fprintf(OPF, "--\n-- Drop tablespaces\n--\n\n");
+	{
+		if (archDumpFormat == archNull)
+			fprintf(OPF, "--\n-- Drop tablespaces\n--\n\n");
+		else
+			createOneArchiveEntry("--\n-- Drop tablespaces\n--\n\n", "COMMENT");
+	}
 
 	for (i = 0; i < PQntuples(res); i++)
 	{
 		char	   *spcname = PQgetvalue(res, i, 0);
 
-		fprintf(OPF, "DROP TABLESPACE %s%s;\n",
+		appendPQExpBuffer(delQry, "DROP TABLESPACE %s%s;\n",
 				if_exists ? "IF EXISTS " : "",
 				fmtId(spcname));
+
+		if (archDumpFormat == archNull)
+			fprintf(OPF, "%s", delQry->data);
+		else
+			createOneArchiveEntry(delQry->data, "dropTablespaces");
 	}
 
 	PQclear(res);
 
-	fprintf(OPF, "\n\n");
+	if (archDumpFormat == archNull)
+		fprintf(OPF, "\n\n");
 }
 
 /*
@@ -1382,7 +1591,12 @@ dumpTablespaces(PGconn *conn)
 					   "ORDER BY 1");
 
 	if (PQntuples(res) > 0)
-		fprintf(OPF, "--\n-- Tablespaces\n--\n\n");
+	{
+		if (archDumpFormat == archNull)
+			fprintf(OPF, "--\n-- Tablespaces\n--\n\n");
+		else
+			createOneArchiveEntry("--\n-- Tablespaces\n--\n\n", "COMMENT");
+	}
 
 	for (i = 0; i < PQntuples(res); i++)
 	{
@@ -1451,14 +1665,19 @@ dumpTablespaces(PGconn *conn)
 							 "TABLESPACE", spcname,
 							 buf);
 
-		fprintf(OPF, "%s", buf->data);
+		if (archDumpFormat == archNull)
+			fprintf(OPF, "%s", buf->data);
+		else
+			createOneArchiveEntry(buf->data, "dumpTablespaces");
 
 		free(fspcname);
 		destroyPQExpBuffer(buf);
 	}
 
 	PQclear(res);
-	fprintf(OPF, "\n\n");
+
+	if (archDumpFormat == archNull)
+		fprintf(OPF, "\n\n");
 }
 
 
@@ -1482,7 +1701,12 @@ dropDBs(PGconn *conn)
 					   "ORDER BY datname");
 
 	if (PQntuples(res) > 0)
-		fprintf(OPF, "--\n-- Drop databases (except postgres and template1)\n--\n\n");
+	{
+		if (archDumpFormat == archNull)
+			fprintf(OPF, "--\n-- Drop databases (except postgres and template1)\n--\n\n");
+		else
+			createOneArchiveEntry("--\n-- Drop databases (except postgres and template1)\n--\n\n", "COMMENT");
+	}
 
 	for (i = 0; i < PQntuples(res); i++)
 	{
@@ -1497,15 +1721,23 @@ dropDBs(PGconn *conn)
 			strcmp(dbname, "template0") != 0 &&
 			strcmp(dbname, "postgres") != 0)
 		{
-			fprintf(OPF, "DROP DATABASE %s%s;\n",
+			PQExpBuffer delQry = createPQExpBuffer();
+
+			appendPQExpBuffer(delQry, "DROP DATABASE %s%s;\n",
 					if_exists ? "IF EXISTS " : "",
 					fmtId(dbname));
+
+			if (archDumpFormat == archNull)
+				fprintf(OPF, "%s", delQry->data);
+			else
+				createOneArchiveEntry(delQry->data, "DROP_DATABASE");
 		}
 	}
 
 	PQclear(res);
 
-	fprintf(OPF, "\n\n");
+	if (archDumpFormat == archNull)
+		fprintf(OPF, "\n\n");
 }
 
 
@@ -1532,7 +1764,18 @@ dumpUserConfig(PGconn *conn, const char *username)
 		char	   *sanitized;
 
 		sanitized = sanitize_line(username, true);
-		fprintf(OPF, "\n--\n-- User Config \"%s\"\n--\n\n", sanitized);
+
+		if (archDumpFormat == archNull)
+			fprintf(OPF, "\n--\n-- User Config \"%s\"\n--\n\n", sanitized);
+		else
+		{
+			PQExpBuffer	qry = createPQExpBuffer();
+
+			appendPQExpBuffer(qry, "\n--\n-- User Config \"%s\"\n--\n\n", sanitized);
+			createOneArchiveEntry(qry->data, "COMMENT");
+			destroyPQExpBuffer(qry);
+		}
+
 		free(sanitized);
 	}
 
@@ -1542,7 +1785,11 @@ dumpUserConfig(PGconn *conn, const char *username)
 		makeAlterConfigCommand(conn, PQgetvalue(res, i, 0),
 							   "ROLE", username, NULL, NULL,
 							   buf);
-		fprintf(OPF, "%s", buf->data);
+
+		if (archDumpFormat == archNull)
+			fprintf(OPF, "%s", buf->data);
+		else
+			createOneArchiveEntry(buf->data, "dumpUserConfig");
 	}
 
 	PQclear(res);
@@ -1608,10 +1855,13 @@ expand_dbname_patterns(PGconn *conn,
  * Dump contents of databases.
  */
 static void
-dumpDatabases(PGconn *conn)
+dumpDatabases(PGconn *conn, ArchiveFormat archDumpFormat)
 {
 	PGresult   *res;
 	int			i;
+	char		db_subdir[MAXPGPATH];
+	char		dbfilepath[MAXPGPATH];
+	FILE	   *map_file = NULL;
 
 	/*
 	 * Skip databases marked not datallowconn, since we'd be unable to connect
@@ -1625,19 +1875,48 @@ dumpDatabases(PGconn *conn)
 	 * doesn't have some failure mode with --clean.
 	 */
 	res = executeQuery(conn,
-					   "SELECT datname "
+					   "SELECT datname, oid "
 					   "FROM pg_database d "
 					   "WHERE datallowconn AND datconnlimit != -2 "
 					   "ORDER BY (datname <> 'template1'), datname");
 
 	if (PQntuples(res) > 0)
-		fprintf(OPF, "--\n-- Databases\n--\n\n");
+	{
+		if (archDumpFormat == archNull)
+			fprintf(OPF, "--\n-- Databases\n--\n\n");
+		else
+			createOneArchiveEntry("--\n-- Databases\n--\n\n", "COMMENT");
+	}
+
+	/*
+	 * If directory/tar/custom format is specified, create a subdirectory
+	 * under the main directory and each database dump file or subdirectory
+	 * will be created in that subdirectory by pg_dump.
+	 */
+   if (archDumpFormat != archNull)
+   {
+	   char        map_file_path[MAXPGPATH];
+
+	   snprintf(db_subdir, MAXPGPATH, "%s/databases", filename);
+
+	   /* Create a subdirectory with 'databases' name under main directory. */
+	   if (mkdir(db_subdir, pg_dir_create_mode) != 0)
+		   pg_fatal("could not create directory \"%s\": %m", db_subdir);
+
+	   snprintf(map_file_path, MAXPGPATH, "%s/map.dat", filename);
+
+	   /* Create a map file (to store dboid and dbname) */
+	   map_file = fopen(map_file_path, PG_BINARY_W);
+	   if (!map_file)
+		   pg_fatal("could not open file \"%s\": %m", map_file_path);
+   }
 
 	for (i = 0; i < PQntuples(res); i++)
 	{
 		char	   *dbname = PQgetvalue(res, i, 0);
 		char	   *sanitized;
-		const char *create_opts;
+		char       *oid = PQgetvalue(res, i, 1);
+		const char *create_opts = "";
 		int			ret;
 
 		/* Skip template0, even if it's not marked !datallowconn. */
@@ -1654,7 +1933,18 @@ dumpDatabases(PGconn *conn)
 		pg_log_info("dumping database \"%s\"", dbname);
 
 		sanitized = sanitize_line(dbname, true);
-		fprintf(OPF, "--\n-- Database \"%s\" dump\n--\n\n", sanitized);
+
+		if (archDumpFormat == archNull)
+			fprintf(OPF, "--\n-- Database \"%s\" dump\n--\n\n", sanitized);
+		else
+		{
+			PQExpBuffer qry = createPQExpBuffer();
+
+			appendPQExpBuffer(qry, "--\n-- Database \"%s\" dump\n--\n\n", sanitized);
+			createOneArchiveEntry(qry->data, "COMMENT");
+			destroyPQExpBuffer(qry);
+		}
+
 		free(sanitized);
 
 		/*
@@ -1669,24 +1959,46 @@ dumpDatabases(PGconn *conn)
 		{
 			if (output_clean)
 				create_opts = "--clean --create";
+			/* Since pg_dump won't emit a \connect command, we must */
+			else if (archDumpFormat == archNull)
+				fprintf(OPF, "\\connect %s\n\n", dbname);
 			else
 			{
-				create_opts = "";
-				/* Since pg_dump won't emit a \connect command, we must */
-				fprintf(OPF, "\\connect %s\n\n", dbname);
+				PQExpBuffer	qry = createPQExpBuffer();
+
+				appendPQExpBuffer(qry, "\\connect %s\n\n", dbname);
+				createOneArchiveEntry(qry->data, "CONNECT");
+				destroyPQExpBuffer(qry);
 			}
 		}
 		else
 			create_opts = "--create";
 
-		if (filename)
+		if (filename && archDumpFormat == archNull)
 			fclose(OPF);
 
-		ret = runPgDump(dbname, create_opts);
+		/*
+		 * If this is not a plain format dump, then append dboid and dbname to
+		 * the map.dat file.
+		 */
+		if (archDumpFormat != archNull)
+		{
+			if (archDumpFormat == archCustom)
+				snprintf(dbfilepath, MAXPGPATH, "\"%s\"/\"%s\".dmp", db_subdir, oid);
+			else if (archDumpFormat == archTar)
+				snprintf(dbfilepath, MAXPGPATH, "\"%s\"/\"%s\".tar", db_subdir, oid);
+			else
+				snprintf(dbfilepath, MAXPGPATH, "\"%s\"/\"%s\"", db_subdir, oid);
+
+			/* Put one line entry for dboid and dbname in map file. */
+			fprintf(map_file, "%s %s\n", oid, dbname);
+		}
+
+		ret = runPgDump(dbname, create_opts, dbfilepath, archDumpFormat);
 		if (ret != 0)
 			pg_fatal("pg_dump failed on database \"%s\", exiting", dbname);
 
-		if (filename)
+		if (filename && archDumpFormat == archNull)
 		{
 			OPF = fopen(filename, PG_BINARY_A);
 			if (!OPF)
@@ -1695,6 +2007,10 @@ dumpDatabases(PGconn *conn)
 		}
 	}
 
+	/* Close map file */
+	if (archDumpFormat != archNull)
+		fclose(map_file);
+
 	PQclear(res);
 }
 
@@ -1704,7 +2020,8 @@ dumpDatabases(PGconn *conn)
  * Run pg_dump on dbname, with specified options.
  */
 static int
-runPgDump(const char *dbname, const char *create_opts)
+runPgDump(const char *dbname, const char *create_opts, char *dbfile,
+		  ArchiveFormat archDumpFormat)
 {
 	PQExpBufferData connstrbuf;
 	PQExpBufferData cmd;
@@ -1713,17 +2030,36 @@ runPgDump(const char *dbname, const char *create_opts)
 	initPQExpBuffer(&connstrbuf);
 	initPQExpBuffer(&cmd);
 
-	printfPQExpBuffer(&cmd, "\"%s\" %s %s", pg_dump_bin,
-					  pgdumpopts->data, create_opts);
-
 	/*
-	 * If we have a filename, use the undocumented plain-append pg_dump
-	 * format.
+	 * If this is not a plain format dump, then append file name and dump
+	 * format to the pg_dump command to get archive dump.
 	 */
-	if (filename)
-		appendPQExpBufferStr(&cmd, " -Fa ");
+	if (archDumpFormat != archNull)
+	{
+		printfPQExpBuffer(&cmd, "\"%s\" -f %s %s", pg_dump_bin,
+						  dbfile, create_opts);
+
+		if (archDumpFormat == archDirectory)
+			appendPQExpBufferStr(&cmd, "  --format=directory ");
+		else if (archDumpFormat == archCustom)
+			appendPQExpBufferStr(&cmd, "  --format=custom ");
+		else if (archDumpFormat == archTar)
+			appendPQExpBufferStr(&cmd, "  --format=tar ");
+	}
 	else
-		appendPQExpBufferStr(&cmd, " -Fp ");
+	{
+		printfPQExpBuffer(&cmd, "\"%s\" %s %s", pg_dump_bin,
+						  pgdumpopts->data, create_opts);
+
+		/*
+		 * If we have a filename, use the undocumented plain-append pg_dump
+		 * format.
+		 */
+		if (filename)
+			appendPQExpBufferStr(&cmd, " -Fa ");
+		else
+			appendPQExpBufferStr(&cmd, " -Fp ");
+	}
 
 	/*
 	 * Append the database name to the already-constructed stem of connection
@@ -1807,7 +2143,18 @@ dumpTimestamp(const char *msg)
 	time_t		now = time(NULL);
 
 	if (strftime(buf, sizeof(buf), PGDUMP_STRFTIME_FMT, localtime(&now)) != 0)
-		fprintf(OPF, "-- %s %s\n\n", msg, buf);
+	{
+		if (archDumpFormat == archNull)
+			fprintf(OPF, "-- %s %s\n\n", msg, buf);
+		else
+		{
+			PQExpBuffer	qry = createPQExpBuffer();
+
+			appendPQExpBuffer(qry, "-- %s %s\n\n", msg, buf);
+			createOneArchiveEntry(qry->data, "COMMENT");
+			destroyPQExpBuffer(qry);
+		}
+	}
 }
 
 /*
@@ -1868,3 +2215,54 @@ read_dumpall_filters(const char *filename, SimpleStringList *pattern)
 
 	filter_free(&fstate);
 }
+
+/*
+ * parseDumpFormat
+ *
+ * This will validate dump formats.
+ */
+static ArchiveFormat
+parseDumpFormat(const char *format)
+{
+	ArchiveFormat archDumpFormat;
+
+	if (pg_strcasecmp(format, "c") == 0)
+		archDumpFormat = archCustom;
+	else if (pg_strcasecmp(format, "custom") == 0)
+		archDumpFormat = archCustom;
+	else if (pg_strcasecmp(format, "d") == 0)
+		archDumpFormat = archDirectory;
+	else if (pg_strcasecmp(format, "directory") == 0)
+		archDumpFormat = archDirectory;
+	else if (pg_strcasecmp(format, "p") == 0)
+		archDumpFormat = archNull;
+	else if (pg_strcasecmp(format, "plain") == 0)
+		archDumpFormat = archNull;
+	else if (pg_strcasecmp(format, "t") == 0)
+		archDumpFormat = archTar;
+	else if (pg_strcasecmp(format, "tar") == 0)
+		archDumpFormat = archTar;
+	else
+		pg_fatal("unrecognized output format \"%s\"; please specify \"c\", \"d\", \"p\", or \"t\"",
+				 format);
+
+	return archDumpFormat;
+}
+
+static int
+createDumpId(void)
+{
+	return ++dumpIdVal;
+}
+
+static void
+createOneArchiveEntry(const char *query, const char *tag)
+{
+	ArchiveEntry(fout,
+			nilCatalogId, /* catalog ID */
+			createDumpId(), /* dump ID */
+			ARCHIVE_OPTS(.tag = tag,
+				.description = tag,
+				.section = SECTION_PRE_DATA,
+				.createStmt = query));
+}
diff --git a/src/bin/pg_dump/pg_restore.c b/src/bin/pg_dump/pg_restore.c
index c9776306c5c..02176a77bd7 100644
--- a/src/bin/pg_dump/pg_restore.c
+++ b/src/bin/pg_dump/pg_restore.c
@@ -2,7 +2,7 @@
  *
  * pg_restore.c
  *	pg_restore is an utility extracting postgres database definitions
- *	from a backup archive created by pg_dump using the archiver
+ *	from a backup archive created by pg_dump/pg_dumpall using the archiver
  *	interface.
  *
  *	pg_restore will read the backup archive and
@@ -41,31 +41,60 @@
 #include "postgres_fe.h"
 
 #include <ctype.h>
+#include <sys/stat.h>
 #ifdef HAVE_TERMIOS_H
 #include <termios.h>
 #endif
 
+#include "common/string.h"
+#include "connectdb.h"
 #include "dumputils.h"
 #include "fe_utils/option_utils.h"
+#include "fe_utils/string_utils.h"
 #include "filter.h"
 #include "getopt_long.h"
 #include "parallel.h"
 #include "pg_backup_utils.h"
 
+
 static void usage(const char *progname);
 static void read_restore_filters(const char *filename, RestoreOptions *opts);
+static bool file_exists_in_directory(const char *dir, const char *filename);
+static int	restore_one_database(const char *inputFileSpec, RestoreOptions *opts,
+								 int numWorkers, bool append_data, int num,
+								 bool globals_only);
+static int restore_global_objects(const char *inputFileSpec, RestoreOptions *opts,
+		int numWorkers, bool append_data, int num, bool globals_only);
+static int	restore_all_databases(const char *inputFileSpec,
+								  SimpleStringList db_exclude_patterns, RestoreOptions *opts, int numWorkers);
+static int	get_dbnames_list_to_restore(PGconn *conn,
+										SimplePtrList *dbname_oid_list,
+										SimpleStringList db_exclude_patterns);
+static int	get_dbname_oid_list_from_mfile(const char *dumpdirpath,
+										   SimplePtrList *dbname_oid_list);
+
+/*
+ * Stores a database OID and the corresponding name.
+ */
+typedef struct DbOidName
+{
+	Oid			oid;
+	char		str[FLEXIBLE_ARRAY_MEMBER]; /* null-terminated string here */
+} DbOidName;
+
 
 int
 main(int argc, char **argv)
 {
 	RestoreOptions *opts;
 	int			c;
-	int			exit_code;
 	int			numWorkers = 1;
-	Archive    *AH;
 	char	   *inputFileSpec;
 	bool		data_only = false;
 	bool		schema_only = false;
+	int			n_errors = 0;
+	bool		globals_only = false;
+	SimpleStringList db_exclude_patterns = {NULL, NULL};
 	static int	disable_triggers = 0;
 	static int	enable_row_security = 0;
 	static int	if_exists = 0;
@@ -89,6 +118,7 @@ main(int argc, char **argv)
 		{"clean", 0, NULL, 'c'},
 		{"create", 0, NULL, 'C'},
 		{"data-only", 0, NULL, 'a'},
+		{"globals-only", 0, NULL, 'g'},
 		{"dbname", 1, NULL, 'd'},
 		{"exit-on-error", 0, NULL, 'e'},
 		{"exclude-schema", 1, NULL, 'N'},
@@ -142,6 +172,7 @@ main(int argc, char **argv)
 		{"statistics-only", no_argument, &statistics_only, 1},
 		{"filter", required_argument, NULL, 4},
 		{"restrict-key", required_argument, NULL, 6},
+		{"exclude-database", required_argument, NULL, 7},
 
 		{NULL, 0, NULL, 0}
 	};
@@ -170,7 +201,7 @@ main(int argc, char **argv)
 		}
 	}
 
-	while ((c = getopt_long(argc, argv, "acCd:ef:F:h:I:j:lL:n:N:Op:P:RsS:t:T:U:vwWx1",
+	while ((c = getopt_long(argc, argv, "acCd:ef:F:gh:I:j:lL:n:N:Op:P:RsS:t:T:U:vwWx1",
 							cmdopts, NULL)) != -1)
 	{
 		switch (c)
@@ -197,11 +228,14 @@ main(int argc, char **argv)
 				if (strlen(optarg) != 0)
 					opts->formatName = pg_strdup(optarg);
 				break;
+			case 'g':
+				/* restore only global.dat file from directory */
+				globals_only = true;
+				break;
 			case 'h':
 				if (strlen(optarg) != 0)
 					opts->cparams.pghost = pg_strdup(optarg);
 				break;
-
 			case 'j':			/* number of restore jobs */
 				if (!option_parse_int(optarg, "-j/--jobs", 1,
 									  PG_MAX_JOBS,
@@ -316,6 +350,9 @@ main(int argc, char **argv)
 					exit(1);
 				opts->exit_on_error = true;
 				break;
+			case 7:				/* database patterns to skip */
+				simple_string_list_append(&db_exclude_patterns, optarg);
+				break;
 
 			case 6:
 				opts->restrict_key = pg_strdup(optarg);
@@ -347,6 +384,13 @@ main(int argc, char **argv)
 	if (!opts->cparams.dbname && !opts->filename && !opts->tocSummary)
 		pg_fatal("one of -d/--dbname and -f/--file must be specified");
 
+	if (db_exclude_patterns.head != NULL && globals_only)
+	{
+		pg_log_error("option --exclude-database cannot be used together with -g/--globals-only");
+		pg_log_error_hint("Try \"%s --help\" for more information.", progname);
+		exit_nicely(1);
+	}
+
 	/* Should get at most one of -d and -f, else user is confused */
 	if (opts->cparams.dbname)
 	{
@@ -472,6 +516,105 @@ main(int argc, char **argv)
 					 opts->formatName);
 	}
 
+	/*
+	 * If map.dat file is present, then restore all the
+	 * databases from map.dat , but skip restoring those matching
+	 * --exclude-database patterns.
+	 */
+	if (inputFileSpec != NULL &&
+			(file_exists_in_directory(inputFileSpec, "map.dat") ||
+			 file_exists_in_directory(inputFileSpec, "toc.tar") ||
+			 file_exists_in_directory(inputFileSpec, "toc.dmp")))
+	{
+		char        global_path[MAXPGPATH];
+
+		if (file_exists_in_directory(inputFileSpec, "toc.tar"))
+			snprintf(global_path, MAXPGPATH, "%s/toc.tar", inputFileSpec);
+		else if (file_exists_in_directory(inputFileSpec, "toc.dmp"))
+			snprintf(global_path, MAXPGPATH, "%s/toc.dmp", inputFileSpec);
+		else
+			snprintf(global_path, MAXPGPATH, "%s", inputFileSpec);
+
+		/*
+		 * Can only use --list or --use-list options with a single database
+		 * dump.
+		 */
+		if (opts->tocSummary)
+			pg_fatal("option -l/--list cannot be used when restoring an archive created by pg_dumpall");
+		else if (opts->tocFile)
+			pg_fatal("option -L/--use-list cannot be used when restoring an archive created by pg_dumpall");
+
+		/*
+		 * To restore from a pg_dumpall archive, -C (create database) option
+		 * must be specified unless we are only restoring globals.
+		 */
+		if (!globals_only && opts->createDB != 1)
+		{
+			pg_log_error("option -C/--create must be specified when restoring an archive created by pg_dumpall");
+			pg_log_error_hint("Try \"%s --help\" for more information.", progname);
+			pg_log_error_hint("Individual databases can be restored using their specific archives.");
+			exit_nicely(1);
+		}
+
+		/* If globals-only, then return from here. */
+		if (globals_only)
+		{
+			n_errors = restore_global_objects(global_path, opts, numWorkers, false, 0, globals_only);
+
+			pg_log_info("database restoring skipped because option -g/--globals-only was specified");
+		}
+		else
+		{
+			/* Now restore all the databases from map.dat */
+			n_errors = restore_all_databases(inputFileSpec, db_exclude_patterns,
+											 opts, numWorkers);
+		}
+
+		/* Free db pattern list. */
+		simple_string_list_destroy(&db_exclude_patterns);
+	}
+	else						/* process if map.dat file does not exist. */
+	{
+		n_errors = restore_one_database(inputFileSpec, opts, numWorkers, false, 0, globals_only);
+	}
+
+	/* Done, print a summary of ignored errors during restore. */
+	if (n_errors)
+	{
+		pg_log_warning("errors ignored on restore: %d", n_errors);
+		return 1;
+	}
+
+	return 0;
+}
+
+/*
+ * restore_global_objects
+ *
+ * This restore all global objects.
+ *
+ * If globals_only is set, then skip DROP DATABASE commands from restore.
+ */
+static int restore_global_objects(const char *inputFileSpec, RestoreOptions *opts,
+		int numWorkers, bool append_data, int num, bool globals_only)
+{
+	return restore_one_database(inputFileSpec, opts, numWorkers, append_data, num, globals_only);
+}
+
+/*
+ * restore_one_database
+ *
+ * This will restore one database using toc.dat file.
+ *
+ * returns the number of errors while doing restore.
+ */
+static int
+restore_one_database(const char *inputFileSpec, RestoreOptions *opts,
+					 int numWorkers, bool append_data, int num, bool globals_only)
+{
+	Archive    *AH;
+	int			n_errors;
+
 	AH = OpenArchive(inputFileSpec, opts->format);
 
 	SetArchiveOptions(AH, NULL, opts);
@@ -479,9 +622,15 @@ main(int argc, char **argv)
 	/*
 	 * We don't have a connection yet but that doesn't matter. The connection
 	 * is initialized to NULL and if we terminate through exit_nicely() while
-	 * it's still NULL, the cleanup function will just be a no-op.
+	 * it's still NULL, the cleanup function will just be a no-op. If we are
+	 * restoring multiple databases, then only update AX handle for cleanup as
+	 * the previous entry was already in the array and we had closed previous
+	 * connection, so we can use the same array slot.
 	 */
-	on_exit_close_archive(AH);
+	if (!append_data || num == 0)
+		on_exit_close_archive(AH);
+	else
+		replace_on_exit_close_archive(AH);
 
 	/* Let the archiver know how noisy to be */
 	AH->verbose = opts->verbose;
@@ -501,25 +650,21 @@ main(int argc, char **argv)
 	else
 	{
 		ProcessArchiveRestoreOptions(AH);
-		RestoreArchive(AH);
+		RestoreArchive(AH, append_data, globals_only);
 	}
 
-	/* done, print a summary of ignored errors */
-	if (AH->n_errors)
-		pg_log_warning("errors ignored on restore: %d", AH->n_errors);
+	n_errors = AH->n_errors;
 
 	/* AH may be freed in CloseArchive? */
-	exit_code = AH->n_errors ? 1 : 0;
-
 	CloseArchive(AH);
 
-	return exit_code;
+	return n_errors;
 }
 
 static void
 usage(const char *progname)
 {
-	printf(_("%s restores a PostgreSQL database from an archive created by pg_dump.\n\n"), progname);
+	printf(_("%s restores PostgreSQL databases from archives created by pg_dump or pg_dumpall.\n\n"), progname);
 	printf(_("Usage:\n"));
 	printf(_("  %s [OPTION]... [FILE]\n"), progname);
 
@@ -537,6 +682,7 @@ usage(const char *progname)
 	printf(_("  -c, --clean                  clean (drop) database objects before recreating\n"));
 	printf(_("  -C, --create                 create the target database\n"));
 	printf(_("  -e, --exit-on-error          exit on error, default is to continue\n"));
+	printf(_("  -g, --globals-only           restore only global objects, no databases\n"));
 	printf(_("  -I, --index=NAME             restore named index\n"));
 	printf(_("  -j, --jobs=NUM               use this many parallel jobs to restore\n"));
 	printf(_("  -L, --use-list=FILENAME      use table of contents from this file for\n"
@@ -553,6 +699,7 @@ usage(const char *progname)
 	printf(_("  -1, --single-transaction     restore as a single transaction\n"));
 	printf(_("  --disable-triggers           disable triggers during data-only restore\n"));
 	printf(_("  --enable-row-security        enable row security\n"));
+	printf(_("  --exclude-database=PATTERN   do not restore the specified database(s)\n"));
 	printf(_("  --filter=FILENAME            restore or skip objects based on expressions\n"
 			 "                               in FILENAME\n"));
 	printf(_("  --if-exists                  use IF EXISTS when dropping objects\n"));
@@ -588,8 +735,8 @@ usage(const char *progname)
 	printf(_("  --role=ROLENAME          do SET ROLE before restore\n"));
 
 	printf(_("\n"
-			 "The options -I, -n, -N, -P, -t, -T, and --section can be combined and specified\n"
-			 "multiple times to select multiple objects.\n"));
+			 "The options -I, -n, -N, -P, -t, -T, --section, and --exclude-database can be\n"
+			 "combined and specified multiple times to select multiple objects.\n"));
 	printf(_("\nIf no input file name is supplied, then standard input is used.\n\n"));
 	printf(_("Report bugs to <%s>.\n"), PACKAGE_BUGREPORT);
 	printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
@@ -694,3 +841,415 @@ read_restore_filters(const char *filename, RestoreOptions *opts)
 
 	filter_free(&fstate);
 }
+
+/*
+ * file_exists_in_directory
+ *
+ * Returns true if the file exists in the given directory.
+ */
+static bool
+file_exists_in_directory(const char *dir, const char *filename)
+{
+	struct stat st;
+	char		buf[MAXPGPATH];
+
+	if (snprintf(buf, MAXPGPATH, "%s/%s", dir, filename) >= MAXPGPATH)
+		pg_fatal("directory name too long: \"%s\"", dir);
+
+	return (stat(buf, &st) == 0 && S_ISREG(st.st_mode));
+}
+
+/*
+ * get_dbnames_list_to_restore
+ *
+ * This will mark for skipping any entries from dbname_oid_list that pattern match an
+ * entry in the db_exclude_patterns list.
+ *
+ * Returns the number of database to be restored.
+ *
+ */
+static int
+get_dbnames_list_to_restore(PGconn *conn,
+							SimplePtrList *dbname_oid_list,
+							SimpleStringList db_exclude_patterns)
+{
+	int			count_db = 0;
+	PQExpBuffer query;
+	PGresult   *res;
+
+	query = createPQExpBuffer();
+
+	if (!conn && db_exclude_patterns.head != NULL)
+		pg_log_info("considering PATTERN as NAME for --exclude-database option as no database connection while doing pg_restore");
+
+	/*
+	 * Process one by one all dbnames and if specified to skip restoring, then
+	 * remove dbname from list.
+	 */
+	for (SimplePtrListCell *db_cell = dbname_oid_list->head;
+		 db_cell; db_cell = db_cell->next)
+	{
+		DbOidName  *dbidname = (DbOidName *) db_cell->ptr;
+		bool		skip_db_restore = false;
+		PQExpBuffer db_lit = createPQExpBuffer();
+
+		appendStringLiteralConn(db_lit, dbidname->str, conn);
+
+		for (SimpleStringListCell *pat_cell = db_exclude_patterns.head; pat_cell; pat_cell = pat_cell->next)
+		{
+			/*
+			 * If there is an exact match then we don't need to try a pattern
+			 * match
+			 */
+			if (pg_strcasecmp(dbidname->str, pat_cell->val) == 0)
+				skip_db_restore = true;
+			/* Otherwise, try a pattern match if there is a connection */
+			else if (conn)
+			{
+				int			dotcnt;
+
+				appendPQExpBufferStr(query, "SELECT 1 ");
+				processSQLNamePattern(conn, query, pat_cell->val, false,
+									  false, NULL, db_lit->data,
+									  NULL, NULL, NULL, &dotcnt);
+
+				if (dotcnt > 0)
+				{
+					pg_log_error("improper qualified name (too many dotted names): %s",
+								 dbidname->str);
+					PQfinish(conn);
+					exit_nicely(1);
+				}
+
+				res = executeQuery(conn, query->data);
+
+				if ((PQresultStatus(res) == PGRES_TUPLES_OK) && PQntuples(res))
+				{
+					skip_db_restore = true;
+					pg_log_info("database name \"%s\" matches exclude pattern \"%s\"", dbidname->str, pat_cell->val);
+				}
+
+				PQclear(res);
+				resetPQExpBuffer(query);
+			}
+
+			if (skip_db_restore)
+				break;
+		}
+
+		destroyPQExpBuffer(db_lit);
+
+		/*
+		 * Mark db to be skipped or increment the counter of dbs to be
+		 * restored
+		 */
+		if (skip_db_restore)
+		{
+			pg_log_info("excluding database \"%s\"", dbidname->str);
+			dbidname->oid = InvalidOid;
+		}
+		else
+		{
+			count_db++;
+		}
+	}
+
+	destroyPQExpBuffer(query);
+
+	return count_db;
+}
+
+/*
+ * get_dbname_oid_list_from_mfile
+ *
+ * Open map.dat file and read line by line and then prepare a list of database
+ * names and corresponding db_oid.
+ *
+ * Returns, total number of database names in map.dat file.
+ */
+static int
+get_dbname_oid_list_from_mfile(const char *dumpdirpath, SimplePtrList *dbname_oid_list)
+{
+	StringInfoData linebuf;
+	FILE	   *pfile;
+	char		map_file_path[MAXPGPATH];
+	int			count = 0;
+
+
+	/*
+	 * If there is only global.dat file in dump, then return from here as
+	 * there is no database to restore.
+	 */
+	if (!file_exists_in_directory(dumpdirpath, "map.dat"))
+	{
+		pg_log_info("database restoring is skipped because file \"%s\" does not exist in directory \"%s\"", "map.dat", dumpdirpath);
+		return 0;
+	}
+
+	snprintf(map_file_path, MAXPGPATH, "%s/map.dat", dumpdirpath);
+
+	/* Open map.dat file. */
+	pfile = fopen(map_file_path, PG_BINARY_R);
+
+	if (pfile == NULL)
+		pg_fatal("could not open file \"%s\": %m", map_file_path);
+
+	initStringInfo(&linebuf);
+
+	/* Append all the dbname/db_oid combinations to the list. */
+	while (pg_get_line_buf(pfile, &linebuf))
+	{
+		Oid			db_oid = InvalidOid;
+		char	   *dbname;
+		DbOidName  *dbidname;
+		int			namelen;
+		char	   *p = linebuf.data;
+
+		/* Extract dboid. */
+		while (isdigit((unsigned char) *p))
+			p++;
+		if (p > linebuf.data && *p == ' ')
+		{
+			sscanf(linebuf.data, "%u", &db_oid);
+			p++;
+		}
+
+		/* dbname is the rest of the line */
+		dbname = p;
+		namelen = strlen(dbname);
+
+		/* Report error and exit if the file has any corrupted data. */
+		if (!OidIsValid(db_oid) || namelen <= 1)
+			pg_fatal("invalid entry in file \"%s\" on line %d", map_file_path,
+					 count + 1);
+
+		pg_log_info("found database \"%s\" (OID: %u) in file \"%s\"",
+					dbname, db_oid, map_file_path);
+
+		dbidname = pg_malloc(offsetof(DbOidName, str) + namelen + 1);
+		dbidname->oid = db_oid;
+		strlcpy(dbidname->str, dbname, namelen);
+
+		simple_ptr_list_append(dbname_oid_list, dbidname);
+		count++;
+	}
+
+	/* Close map.dat file. */
+	fclose(pfile);
+
+	return count;
+}
+
+/*
+ * restore_all_databases
+ *
+ * This will restore databases those dumps are present in
+ * directory based on map.dat file mapping.
+ *
+ * This will skip restoring for databases that are specified with
+ * exclude-database option.
+ *
+ * returns, number of errors while doing restore.
+ */
+static int
+restore_all_databases(const char *inputFileSpec,
+					  SimpleStringList db_exclude_patterns, RestoreOptions *opts,
+					  int numWorkers)
+{
+	SimplePtrList dbname_oid_list = {NULL, NULL};
+	int			num_db_restore = 0;
+	int			num_total_db;
+	int			n_errors_total;
+	int			count = 0;
+	char	   *connected_db = NULL;
+	bool		dumpData = opts->dumpData;
+	bool		dumpSchema = opts->dumpSchema;
+	bool		dumpStatistics = opts->dumpSchema;
+	PGconn *conn = NULL;
+	char		global_path[MAXPGPATH];
+
+	/* Based on file, set path. */
+	if (file_exists_in_directory(inputFileSpec, "toc.tar"))
+		snprintf(global_path, MAXPGPATH, "%s/toc.tar", inputFileSpec);
+	else if (file_exists_in_directory(inputFileSpec, "toc.dmp"))
+		snprintf(global_path, MAXPGPATH, "%s/toc.dmp", inputFileSpec);
+	else
+		snprintf(global_path, MAXPGPATH, "%s", inputFileSpec);
+
+	/* Save db name to reuse it for all the database. */
+	if (opts->cparams.dbname)
+		connected_db = opts->cparams.dbname;
+
+	num_total_db = get_dbname_oid_list_from_mfile(inputFileSpec, &dbname_oid_list);
+
+	/* If map.dat has no entries, return after processing global commands. */
+	if (dbname_oid_list.head == NULL)
+		return restore_global_objects(global_path, opts, numWorkers, false, 0, false);
+
+	pg_log_info(ngettext("found %d database name in \"%s\"",
+						 "found %d database names in \"%s\"",
+						 num_total_db),
+				num_total_db, "map.dat");
+
+	/*
+	 * If exclude-patterns is given, then connect to the database to process
+	 * it.
+	 */
+	if (db_exclude_patterns.head != NULL)
+	{
+		if (opts->cparams.dbname)
+		{
+			conn = ConnectDatabase(opts->cparams.dbname, NULL, opts->cparams.pghost,
+					opts->cparams.pgport, opts->cparams.username, TRI_DEFAULT,
+					false, progname, NULL, NULL, NULL, NULL);
+
+			if (!conn)
+				pg_fatal("could not connect to database \"%s\"", opts->cparams.dbname);
+		}
+
+		if (!conn)
+		{
+			pg_log_info("trying to connect to database \"%s\"", "postgres");
+
+			conn = ConnectDatabase("postgres", NULL, opts->cparams.pghost,
+					opts->cparams.pgport, opts->cparams.username, TRI_DEFAULT,
+					false, progname, NULL, NULL, NULL, NULL);
+
+			/* Try with template1. */
+			if (!conn)
+			{
+				pg_log_info("trying to connect to database \"%s\"", "template1");
+
+				conn = ConnectDatabase("template1", NULL, opts->cparams.pghost,
+						opts->cparams.pgport, opts->cparams.username, TRI_DEFAULT,
+						false, progname, NULL, NULL, NULL, NULL);
+			}
+		}
+	}
+
+	/*
+	 * filter the db list according to the exclude patterns
+	 */
+	num_db_restore = get_dbnames_list_to_restore(conn, &dbname_oid_list,
+												 db_exclude_patterns);
+
+	/* Close the db connection as we are done with globals and patterns. */
+	if (conn)
+		PQfinish(conn);
+
+	/* Open toc.dat file and execute/append all the global sql commands. */
+	n_errors_total =  restore_global_objects(global_path, opts, numWorkers, false, 0, false);
+
+	/* Exit if no db needs to be restored. */
+	if (dbname_oid_list.head == NULL || num_db_restore == 0)
+	{
+		pg_log_info(ngettext("no database needs restoring out of %d database",
+							 "no database needs restoring out of %d databases", num_total_db),
+					num_total_db);
+		return n_errors_total;
+	}
+
+	pg_log_info("need to restore %d databases out of %d databases", num_db_restore, num_total_db);
+
+	/*
+	 * We have a list of databases to restore after processing the
+	 * exclude-database switch(es).  Now we can restore them one by one.
+	 */
+	for (SimplePtrListCell *db_cell = dbname_oid_list.head;
+		 db_cell; db_cell = db_cell->next)
+	{
+		DbOidName  *dbidname = (DbOidName *) db_cell->ptr;
+		char		subdirpath[MAXPGPATH];
+		char		subdirdbpath[MAXPGPATH];
+		char		dbfilename[MAXPGPATH];
+		int			n_errors;
+
+		/* ignore dbs marked for skipping */
+		if (dbidname->oid == InvalidOid)
+			continue;
+
+		/*
+		 * We need to reset override_dbname so that objects can be restored
+		 * into an already created database. (used with -d/--dbname option)
+		 */
+		if (opts->cparams.override_dbname)
+		{
+			pfree(opts->cparams.override_dbname);
+			opts->cparams.override_dbname = NULL;
+		}
+
+		snprintf(subdirdbpath, MAXPGPATH, "%s/databases", inputFileSpec);
+
+		/*
+		 * Look for the database dump file/dir. If there is an {oid}.tar or
+		 * {oid}.dmp file, use it. Otherwise try to use a directory called
+		 * {oid}
+		 */
+		snprintf(dbfilename, MAXPGPATH, "%u.tar", dbidname->oid);
+		if (file_exists_in_directory(subdirdbpath, dbfilename))
+			snprintf(subdirpath, MAXPGPATH, "%s/databases/%u.tar", inputFileSpec, dbidname->oid);
+		else
+		{
+			snprintf(dbfilename, MAXPGPATH, "%u.dmp", dbidname->oid);
+
+			if (file_exists_in_directory(subdirdbpath, dbfilename))
+				snprintf(subdirpath, MAXPGPATH, "%s/databases/%u.dmp", inputFileSpec, dbidname->oid);
+			else
+				snprintf(subdirpath, MAXPGPATH, "%s/databases/%u", inputFileSpec, dbidname->oid);
+		}
+
+		pg_log_info("restoring database \"%s\"", dbidname->str);
+
+		/* If database is already created, then don't set createDB flag. */
+		if (opts->cparams.dbname)
+		{
+			PGconn	   *test_conn;
+
+			test_conn = ConnectDatabase(dbidname->str, NULL, opts->cparams.pghost,
+										opts->cparams.pgport, opts->cparams.username, TRI_DEFAULT,
+										false, progname, NULL, NULL, NULL, NULL);
+			if (test_conn)
+			{
+				PQfinish(test_conn);
+
+				/* Use already created database for connection. */
+				opts->createDB = 0;
+				opts->cparams.dbname = dbidname->str;
+			}
+			else
+			{
+				/* we'll have to create it */
+				opts->createDB = 1;
+				opts->cparams.dbname = connected_db;
+			}
+		}
+
+		/*
+		 * Reset flags - might have been reset in pg_backup_archiver.c by the
+		 * previous restore.
+		 */
+		opts->dumpData = dumpData;
+		opts->dumpSchema = dumpSchema;
+		opts->dumpStatistics = dumpStatistics;
+
+		/* Restore the single database. */
+		n_errors = restore_one_database(subdirpath, opts, numWorkers, true, 1, false);
+
+		/* Print a summary of ignored errors during single database restore. */
+		if (n_errors)
+		{
+			n_errors_total += n_errors;
+			pg_log_warning("errors ignored on database \"%s\" restore: %d", dbidname->str, n_errors);
+		}
+
+		count++;
+	}
+
+	/* Log number of processed databases. */
+	pg_log_info("number of restored databases is %d", num_db_restore);
+
+	/* Free dbname and dboid list. */
+	simple_ptr_list_destroy(&dbname_oid_list);
+
+	return n_errors_total;
+}
diff --git a/src/bin/pg_dump/t/001_basic.pl b/src/bin/pg_dump/t/001_basic.pl
old mode 100644
new mode 100755
index 37d893d5e6a..56e89da1e5e
--- a/src/bin/pg_dump/t/001_basic.pl
+++ b/src/bin/pg_dump/t/001_basic.pl
@@ -237,6 +237,12 @@ command_fails_like(
 	'pg_restore: options -C\/--create and -1\/--single-transaction cannot be used together'
 );
 
+command_fails_like(
+	[ 'pg_restore', '--exclude-database=foo', '--globals-only', '-d', 'xxx' ],
+	qr/\Qpg_restore: error: option --exclude-database cannot be used together with -g\/--globals-only\E/,
+	'pg_restore: option --exclude-database cannot be used together with -g/--globals-only'
+);
+
 # also fails for -r and -t, but it seems pointless to add more tests for those.
 command_fails_like(
 	[ 'pg_dumpall', '--exclude-database=foo', '--globals-only' ],
@@ -244,4 +250,8 @@ command_fails_like(
 	'pg_dumpall: option --exclude-database cannot be used together with -g/--globals-only'
 );
 
+command_fails_like(
+	[ 'pg_dumpall', '--format', 'x' ],
+	qr/\Qpg_dumpall: error: unrecognized output format "x";\E/,
+	'pg_dumpall: unrecognized output format');
 done_testing();
diff --git a/src/bin/pg_dump/t/007_pg_dumpall.pl b/src/bin/pg_dump/t/007_pg_dumpall.pl
new file mode 100755
index 00000000000..3c7d2ad7c53
--- /dev/null
+++ b/src/bin/pg_dump/t/007_pg_dumpall.pl
@@ -0,0 +1,396 @@
+# Copyright (c) 2021-2025, PostgreSQL Global Development Group
+
+use strict;
+use warnings FATAL => 'all';
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+my $tempdir = PostgreSQL::Test::Utils::tempdir;
+my $run_db = 'postgres';
+my $sep = $windows_os ? "\\" : "/";
+
+# Tablespace locations used by "restore_tablespace" test case.
+my $tablespace1 = "${tempdir}${sep}tbl1";
+my $tablespace2 = "${tempdir}${sep}tbl2";
+mkdir($tablespace1) || die "mkdir $tablespace1 $!";
+mkdir($tablespace2) || die "mkdir $tablespace2 $!";
+
+# Scape tablespace locations on Windows.
+$tablespace1 = $windows_os ? ($tablespace1 =~ s/\\/\\\\/gr) : $tablespace1;
+$tablespace2 = $windows_os ? ($tablespace2 =~ s/\\/\\\\/gr) : $tablespace2;
+
+# Where pg_dumpall will be executed.
+my $node = PostgreSQL::Test::Cluster->new('node');
+$node->init;
+$node->start;
+
+
+###############################################################
+# Definition of the pg_dumpall test cases to run.
+#
+# Each of these test cases are named and those names are used for fail
+# reporting and also to save the dump and restore information needed for the
+# test to assert.
+#
+# The "setup_sql" is a psql valid script that contains SQL commands to execute
+# before of actually execute the tests. The setups are all executed before of
+# any test execution.
+#
+# The "dump_cmd" and "restore_cmd" are the commands that will be executed. The
+# "restore_cmd" must have the --file flag to save the restore output so that we
+# can assert on it.
+#
+# The "like" and "unlike" is a regexp that is used to match the pg_restore
+# output. It must have at least one of then filled per test cases but it also
+# can have both. See "excluding_databases" test case for example.
+my %pgdumpall_runs = (
+	restore_roles => {
+		setup_sql => '
+		CREATE ROLE dumpall WITH ENCRYPTED PASSWORD \'admin\' SUPERUSER;
+		CREATE ROLE dumpall2 WITH REPLICATION CONNECTION LIMIT 10;',
+		dump_cmd => [
+			'pg_dumpall',
+			'--format' => 'directory',
+			'--file' => "$tempdir/restore_roles",
+		],
+		restore_cmd => [
+			'pg_restore', '-C',
+			'--format' => 'directory',
+			'--file' => "$tempdir/restore_roles.sql",
+			"$tempdir/restore_roles",
+		],
+		like => qr/
+			\s*\QCREATE ROLE dumpall2;\E
+			\s*\QALTER ROLE dumpall2 WITH NOSUPERUSER INHERIT NOCREATEROLE NOCREATEDB NOLOGIN REPLICATION NOBYPASSRLS CONNECTION LIMIT 10;\E
+		/xm
+	},
+
+	restore_tablespace => {
+		setup_sql => "
+		CREATE ROLE tap;
+		CREATE TABLESPACE tbl1 OWNER tap LOCATION '$tablespace1';
+		CREATE TABLESPACE tbl2 OWNER tap LOCATION '$tablespace2' WITH (seq_page_cost=1.0);",
+		dump_cmd => [
+			'pg_dumpall',
+			'--format' => 'directory',
+			'--file' => "$tempdir/restore_tablespace",
+		],
+		restore_cmd => [
+			'pg_restore', '-C',
+			'--format' => 'directory',
+			'--file' => "$tempdir/restore_tablespace.sql",
+			"$tempdir/restore_tablespace",
+		],
+		# Match "E" as optional since it is added on LOCATION when running on
+		# Windows.
+		like => qr/^
+			\n\QCREATE TABLESPACE tbl2 OWNER tap LOCATION \E(?:E)?\Q'$tablespace2';\E
+			\n\QALTER TABLESPACE tbl2 SET (seq_page_cost=1.0);\E
+		/xm,
+	},
+
+	restore_grants => {
+		setup_sql => "
+		CREATE DATABASE tapgrantsdb;
+		CREATE SCHEMA private;
+		CREATE SEQUENCE serial START 101;
+		CREATE FUNCTION fn() RETURNS void AS \$\$
+		BEGIN
+		END;
+		\$\$ LANGUAGE plpgsql;
+		CREATE ROLE super;
+		CREATE ROLE grant1;
+		CREATE ROLE grant2;
+		CREATE ROLE grant3;
+		CREATE ROLE grant4;
+		CREATE ROLE grant5;
+		CREATE ROLE grant6;
+		CREATE ROLE grant7;
+		CREATE ROLE grant8;
+
+		CREATE TABLE t (id int);
+		INSERT INTO t VALUES (1), (2), (3), (4);
+
+		GRANT SELECT ON TABLE t TO grant1;
+		GRANT INSERT ON TABLE t TO grant2;
+		GRANT ALL PRIVILEGES ON TABLE t to grant3;
+		GRANT CONNECT, CREATE ON DATABASE tapgrantsdb TO grant4;
+		GRANT USAGE, CREATE ON SCHEMA private TO grant5;
+		GRANT USAGE, SELECT, UPDATE ON SEQUENCE serial TO grant6;
+		GRANT super TO grant7;
+		GRANT EXECUTE ON FUNCTION fn() TO grant8;
+		",
+		dump_cmd => [
+			'pg_dumpall',
+			'--format' => 'directory',
+			'--file' => "$tempdir/restore_grants",
+		],
+		restore_cmd => [
+			'pg_restore', '-C',
+			'--format' => 'directory',
+			'--file' => "$tempdir/restore_grants.sql",
+			"$tempdir/restore_grants",
+		],
+		like => qr/^
+			\n\QGRANT super TO grant7 WITH INHERIT TRUE GRANTED BY\E
+			(.*\n)*
+			\n\QGRANT ALL ON SCHEMA private TO grant5;\E
+			(.*\n)*
+			\n\QGRANT ALL ON FUNCTION public.fn() TO grant8;\E
+			(.*\n)*
+			\n\QGRANT ALL ON SEQUENCE public.serial TO grant6;\E
+			(.*\n)*
+			\n\QGRANT SELECT ON TABLE public.t TO grant1;\E
+			\n\QGRANT INSERT ON TABLE public.t TO grant2;\E
+			\n\QGRANT ALL ON TABLE public.t TO grant3;\E
+			(.*\n)*
+			\n\QGRANT CREATE,CONNECT ON DATABASE tapgrantsdb TO grant4;\E
+		/xm,
+	},
+
+	excluding_databases => {
+		setup_sql => 'CREATE DATABASE db1;
+		\c db1
+		CREATE TABLE t1 (id int);
+		INSERT INTO t1 VALUES (1), (2), (3), (4);
+		CREATE TABLE t2 (id int);
+		INSERT INTO t2 VALUES (1), (2), (3), (4);
+
+		CREATE DATABASE db2;
+		\c db2
+		CREATE TABLE t3 (id int);
+		INSERT INTO t3 VALUES (1), (2), (3), (4);
+		CREATE TABLE t4 (id int);
+		INSERT INTO t4 VALUES (1), (2), (3), (4);
+
+		CREATE DATABASE dbex3;
+		\c dbex3
+		CREATE TABLE t5 (id int);
+		INSERT INTO t5 VALUES (1), (2), (3), (4);
+		CREATE TABLE t6 (id int);
+		INSERT INTO t6 VALUES (1), (2), (3), (4);
+
+		CREATE DATABASE dbex4;
+		\c dbex4
+		CREATE TABLE t7 (id int);
+		INSERT INTO t7 VALUES (1), (2), (3), (4);
+		CREATE TABLE t8 (id int);
+		INSERT INTO t8 VALUES (1), (2), (3), (4);
+
+		CREATE DATABASE db5;
+		\c db5
+		CREATE TABLE t9 (id int);
+		INSERT INTO t9 VALUES (1), (2), (3), (4);
+		CREATE TABLE t10 (id int);
+		INSERT INTO t10 VALUES (1), (2), (3), (4);
+		',
+		dump_cmd => [
+			'pg_dumpall',
+			'--format' => 'directory',
+			'--file' => "$tempdir/excluding_databases",
+			'--exclude-database' => 'dbex*',
+		],
+		restore_cmd => [
+			'pg_restore', '-C',
+			'--format' => 'directory',
+			'--file' => "$tempdir/excluding_databases.sql",
+			'--exclude-database' => 'db5',
+			"$tempdir/excluding_databases",
+		],
+		like => qr/^
+			\n\QCREATE DATABASE db1\E
+			(.*\n)*
+			\n\QCREATE TABLE public.t1 (\E
+			(.*\n)*
+			\n\QCREATE TABLE public.t2 (\E
+			(.*\n)*
+			\n\QCREATE DATABASE db2\E
+			(.*\n)*
+			\n\QCREATE TABLE public.t3 (\E
+			(.*\n)*
+			\n\QCREATE TABLE public.t4 (/xm,
+		unlike => qr/^
+			\n\QCREATE DATABASE db3\E
+			(.*\n)*
+			\n\QCREATE TABLE public.t5 (\E
+			(.*\n)*
+			\n\QCREATE TABLE public.t6 (\E
+			(.*\n)*
+			\n\QCREATE DATABASE db4\E
+			(.*\n)*
+			\n\QCREATE TABLE public.t7 (\E
+			(.*\n)*
+			\n\QCREATE TABLE public.t8 (\E
+			\n\QCREATE DATABASE db5\E
+			(.*\n)*
+			\n\QCREATE TABLE public.t9 (\E
+			(.*\n)*
+			\n\QCREATE TABLE public.t10 (\E
+		/xm,
+	},
+
+	format_directory => {
+		setup_sql => "CREATE TABLE format_directory(a int, b boolean, c text);
+		INSERT INTO format_directory VALUES (1, true, 'name1'), (2, false, 'name2');",
+		dump_cmd => [
+			'pg_dumpall',
+			'--format' => 'directory',
+			'--file' => "$tempdir/format_directory",
+		],
+		restore_cmd => [
+			'pg_restore', '-C',
+			'--format' => 'directory',
+			'--file' => "$tempdir/format_directory.sql",
+			"$tempdir/format_directory",
+		],
+		like => qr/^\n\QCOPY public.format_directory (a, b, c) FROM stdin;/xm
+	},
+
+	format_tar => {
+		setup_sql => "CREATE TABLE format_tar(a int, b boolean, c text);
+		INSERT INTO format_tar VALUES (1, false, 'name3'), (2, true, 'name4');",
+		dump_cmd => [
+			'pg_dumpall',
+			'--format' => 'tar',
+			'--file' => "$tempdir/format_tar",
+		],
+		restore_cmd => [
+			'pg_restore', '-C',
+			'--format' => 'tar',
+			'--file' => "$tempdir/format_tar.sql",
+			"$tempdir/format_tar",
+		],
+		like => qr/^\n\QCOPY public.format_tar (a, b, c) FROM stdin;/xm
+	},
+
+	format_custom => {
+		setup_sql => "CREATE TABLE format_custom(a int, b boolean, c text);
+		INSERT INTO format_custom VALUES (1, false, 'name5'), (2, true, 'name6');",
+		dump_cmd => [
+			'pg_dumpall',
+			'--format' => 'custom',
+			'--file' => "$tempdir/format_custom",
+		],
+		restore_cmd => [
+			'pg_restore', '-C',
+			'--format' => 'custom',
+			'--file' => "$tempdir/format_custom.sql",
+			"$tempdir/format_custom",
+		],
+		like => qr/^ \n\QCOPY public.format_custom (a, b, c) FROM stdin;/xm
+	},
+
+	dump_globals_only => {
+		setup_sql => "CREATE TABLE format_dir(a int, b boolean, c text);
+		INSERT INTO format_dir VALUES (1, false, 'name5'), (2, true, 'name6');",
+		dump_cmd => [
+			'pg_dumpall',
+			'--format' => 'directory',
+			'--globals-only',
+			'--file' => "$tempdir/dump_globals_only",
+		],
+		restore_cmd => [
+			'pg_restore', '-C', '--globals-only',
+			'--format' => 'directory',
+			'--file' => "$tempdir/dump_globals_only.sql",
+			"$tempdir/dump_globals_only",
+		],
+		like => qr/
+            ^\s*\QCREATE ROLE dumpall;\E\s*\n
+			/xm
+	},);
+
+# First execute the setup_sql
+foreach my $run (sort keys %pgdumpall_runs)
+{
+	if ($pgdumpall_runs{$run}->{setup_sql})
+	{
+		$node->safe_psql($run_db, $pgdumpall_runs{$run}->{setup_sql});
+	}
+}
+
+# Execute the tests
+foreach my $run (sort keys %pgdumpall_runs)
+{
+	# Create a new target cluster to pg_restore each test case run so that we
+	# don't need to take care of the cleanup from the target cluster after each
+	# run.
+	my $target_node = PostgreSQL::Test::Cluster->new("target_$run");
+	$target_node->init;
+	$target_node->start;
+
+	# Dumpall from node cluster.
+	$node->command_ok(\@{ $pgdumpall_runs{$run}->{dump_cmd} },
+		"$run: pg_dumpall runs");
+
+	# Restore the dump on "target_node" cluster.
+	my @restore_cmd = (
+		@{ $pgdumpall_runs{$run}->{restore_cmd} },
+		'--host', $target_node->host, '--port', $target_node->port);
+
+	my ($stdout, $stderr) = run_command(\@restore_cmd);
+
+	# pg_restore --file output file.
+	my $output_file = slurp_file("$tempdir/${run}.sql");
+
+	if (   !($pgdumpall_runs{$run}->{like})
+		&& !($pgdumpall_runs{$run}->{unlike}))
+	{
+		die "missing \"like\" or \"unlike\" in test \"$run\"";
+	}
+
+	if ($pgdumpall_runs{$run}->{like})
+	{
+		like($output_file, $pgdumpall_runs{$run}->{like}, "should dump $run");
+	}
+
+	if ($pgdumpall_runs{$run}->{unlike})
+	{
+		unlike(
+			$output_file,
+			$pgdumpall_runs{$run}->{unlike},
+			"should not dump $run");
+	}
+}
+
+# Some negative test case with dump of pg_dumpall and restore using pg_restore
+# test case 1: when -C is not used in pg_restore with dump of pg_dumpall
+$node->command_fails_like(
+	[
+		'pg_restore',
+		"$tempdir/format_custom",
+		'--format' => 'custom',
+		'--file' => "$tempdir/error_test.sql",
+	],
+	qr/\Qpg_restore: error: option -C\/--create must be specified when restoring an archive created by pg_dumpall\E/,
+	'When -C is not used in pg_restore with dump of pg_dumpall');
+
+# test case 2: When --list option is used with dump of pg_dumpall
+$node->command_fails_like(
+	[
+		'pg_restore',
+		"$tempdir/format_custom", '-C',
+		'--format' => 'custom',
+		'--list',
+		'--file' => "$tempdir/error_test.sql",
+	],
+	qr/\Qpg_restore: error: option -l\/--list cannot be used when restoring an archive created by pg_dumpall\E/,
+	'When --list is used in pg_restore with dump of pg_dumpall');
+
+# test case 3: When non-exist database is given with -d option
+$node->command_fails_like(
+	[
+		'pg_restore',
+		"$tempdir/format_custom", '-C',
+		'--format' => 'custom',
+		'-d' => 'dbpq',
+	],
+	qr/\QFATAL:  database "dbpq" does not exist\E/,
+	'When non-existent database is given with -d option in pg_restore with dump of pg_dumpall'
+);
+
+$node->stop('fast');
+
+done_testing();
-- 
2.39.3

