Hello,

I recently wanted a way to encrypt/decrypt backups while still
utilizing the parallel dump/restore functionality. I couldn't see a
way to do this so I experimented a bit with the directory backup
format. If there's in fact already a way to do this, please tell me
now :-)

The idea is to add a --pipe option to pg_dump / pg_restore where you
can specify a custom shell command that is used to write / read each
.dat-file. Usage examples include encryption with pgp and/or custom
compression pipelines. %p in the command is expanded to the path to
write to / read from. The pipe command is not applied to the toc.

The current version is attached. Could something like this be
acceptable for inclusion?
From 27f6c541be6546edfef62646f514fe1a92042705 Mon Sep 17 00:00:00 2001
From: David Hedberg <david.hedb...@gmail.com>
Date: Sat, 29 Sep 2018 12:55:52 +0200
Subject: [PATCH] Add support for --pipe to pg_dump and pg_restore

---
 src/bin/pg_dump/compress_io.c         | 97 ++++++++++++++++++++++++---
 src/bin/pg_dump/compress_io.h         |  6 +-
 src/bin/pg_dump/pg_backup.h           |  2 +
 src/bin/pg_dump/pg_backup_directory.c | 14 ++--
 src/bin/pg_dump/pg_dump.c             | 17 ++++-
 src/bin/pg_dump/pg_restore.c          |  7 ++
 6 files changed, 121 insertions(+), 22 deletions(-)

diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c
index a96da15dc1..64c06d7eae 100644
--- a/src/bin/pg_dump/compress_io.c
+++ b/src/bin/pg_dump/compress_io.c
@@ -443,6 +443,9 @@ struct cfp
 static int	hasSuffix(const char *filename, const char *suffix);
 #endif
 
+static void
+expand_shell_command(char *buf, size_t bufsize, const char *cmd, const char *filepath);
+
 /* free() without changing errno; useful in several places below */
 static void
 free_keep_errno(void *p)
@@ -464,24 +467,26 @@ free_keep_errno(void *p)
  * On failure, return NULL with an error code in errno.
  */
 cfp *
-cfopen_read(const char *path, const char *mode)
+cfopen_read(const char *path, const char *mode, const char *pipecmd)
 {
 	cfp		   *fp;
 
+	if (pipecmd)
+		fp = cfopen(path, mode, 0, pipecmd);
 #ifdef HAVE_LIBZ
-	if (hasSuffix(path, ".gz"))
-		fp = cfopen(path, mode, 1);
+	else if (hasSuffix(path, ".gz"))
+		fp = cfopen(path, mode, 1, NULL);
 	else
 #endif
 	{
-		fp = cfopen(path, mode, 0);
+		fp = cfopen(path, mode, 0, NULL);
 #ifdef HAVE_LIBZ
 		if (fp == NULL)
 		{
 			char	   *fname;
 
 			fname = psprintf("%s.gz", path);
-			fp = cfopen(fname, mode, 1);
+			fp = cfopen(fname, mode, 1, NULL);
 			free_keep_errno(fname);
 		}
 #endif
@@ -501,19 +506,19 @@ cfopen_read(const char *path, const char *mode)
  * On failure, return NULL with an error code in errno.
  */
 cfp *
-cfopen_write(const char *path, const char *mode, int compression)
+cfopen_write(const char *path, const char *mode, int compression, const char *pipecmd)
 {
 	cfp		   *fp;
 
 	if (compression == 0)
-		fp = cfopen(path, mode, 0);
+		fp = cfopen(path, mode, 0, pipecmd);
 	else
 	{
 #ifdef HAVE_LIBZ
 		char	   *fname;
 
 		fname = psprintf("%s.gz", path);
-		fp = cfopen(fname, mode, compression);
+		fp = cfopen(fname, mode, compression, pipecmd);
 		free_keep_errno(fname);
 #else
 		exit_horribly(modulename, "not built with zlib support\n");
@@ -530,11 +535,32 @@ cfopen_write(const char *path, const char *mode, int compression)
  * On failure, return NULL with an error code in errno.
  */
 cfp *
-cfopen(const char *path, const char *mode, int compression)
+cfopen(const char *path, const char *mode, int compression, const char *pipecmd)
 {
 	cfp		   *fp = pg_malloc(sizeof(cfp));
 
-	if (compression != 0)
+	if (pipecmd)
+	{
+		char cmd[MAXPGPATH];
+		char pmode[2];
+
+		if ( !(mode[0] == 'r' || mode[0] == 'w') ) {
+			exit_horribly(modulename, "Pipe does not support mode %s", mode);
+		}
+		pmode[0] = mode[0];
+		pmode[1] = '\0';
+
+		expand_shell_command(cmd, MAXPGPATH, pipecmd, path);
+
+		fp->compressedfp = NULL;
+		fp->uncompressedfp = popen(cmd, pmode);
+		if (fp->uncompressedfp == NULL)
+		{
+			free_keep_errno(fp);
+			fp->uncompressedfp = NULL;
+		}
+	}
+	else if (compression != 0)
 	{
 #ifdef HAVE_LIBZ
 		if (compression != Z_DEFAULT_COMPRESSION)
@@ -731,5 +757,54 @@ hasSuffix(const char *filename, const char *suffix)
 				  suffix,
 				  suffixlen) == 0;
 }
-
 #endif
+
+/*
+ * Expand a shell command
+ *
+ * Replaces %p in cmd with the path in filepath and writes the result to buf.
+ */
+static void
+expand_shell_command(char *buf, size_t bufsize, const char *cmd, const char *filepath)
+{
+	char	   *dp;
+	char	   *endp;
+	const char *sp;
+
+	dp = buf;
+	endp = buf + bufsize - 1;
+	*endp = '\0';
+
+	for (sp = cmd; *sp; sp++)
+	{
+		if (*sp == '%')
+		{
+			switch (sp[1])
+			{
+				case 'p':
+					/* %p: absolute path of file */
+					sp++;
+					strlcpy(dp, filepath, endp - dp);
+					dp += strlen(dp);
+					break;
+				case '%':
+					/* convert %% to a single % */
+					sp++;
+					if (dp < endp)
+						*dp++ = *sp;
+					break;
+				default:
+					/* otherwise treat the % as not special */
+					if (dp < endp)
+						*dp++ = *sp;
+					break;
+			}
+		}
+		else
+		{
+			if (dp < endp)
+				*dp++ = *sp;
+		}
+	}
+	*dp = '\0';
+}
diff --git a/src/bin/pg_dump/compress_io.h b/src/bin/pg_dump/compress_io.h
index 10fde8bdef..8a09086f96 100644
--- a/src/bin/pg_dump/compress_io.h
+++ b/src/bin/pg_dump/compress_io.h
@@ -56,9 +56,9 @@ extern void EndCompressor(ArchiveHandle *AH, CompressorState *cs);
 
 typedef struct cfp cfp;
 
-extern cfp *cfopen(const char *path, const char *mode, int compression);
-extern cfp *cfopen_read(const char *path, const char *mode);
-extern cfp *cfopen_write(const char *path, const char *mode, int compression);
+extern cfp *cfopen(const char *path, const char *mode, int compression, const char *pipecmd);
+extern cfp *cfopen_read(const char *path, const char *mode, const char *pipecmd);
+extern cfp *cfopen_write(const char *path, const char *mode, int compression, const char *pipecmd);
 extern int	cfread(void *ptr, int size, cfp *fp);
 extern int	cfwrite(const void *ptr, int size, cfp *fp);
 extern int	cfgetc(cfp *fp);
diff --git a/src/bin/pg_dump/pg_backup.h b/src/bin/pg_dump/pg_backup.h
index ba798213be..9aba93f923 100644
--- a/src/bin/pg_dump/pg_backup.h
+++ b/src/bin/pg_dump/pg_backup.h
@@ -87,6 +87,7 @@ typedef struct _restoreOptions
 	int			verbose;
 	int			aclsSkip;
 	const char *lockWaitTimeout;
+	const char *pipeCommand;
 	int			include_everything;
 
 	int			tocSummary;
@@ -141,6 +142,7 @@ typedef struct _dumpOptions
 	int			dumpSections;	/* bitmask of chosen sections */
 	bool		aclsSkip;
 	const char *lockWaitTimeout;
+	const char *pipeCommand;
 
 	/* flags for various command-line long options */
 	int			disable_dollar_quoting;
diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c
index cda90b9a2a..c9a3f5db52 100644
--- a/src/bin/pg_dump/pg_backup_directory.c
+++ b/src/bin/pg_dump/pg_backup_directory.c
@@ -205,7 +205,7 @@ InitArchiveFmt_Directory(ArchiveHandle *AH)
 
 		setFilePath(AH, fname, "toc.dat");
 
-		tocFH = cfopen_read(fname, PG_BINARY_R);
+		tocFH = cfopen_read(fname, PG_BINARY_R, NULL);
 		if (tocFH == NULL)
 			exit_horribly(modulename,
 						  "could not open input file \"%s\": %s\n",
@@ -333,7 +333,7 @@ _StartData(ArchiveHandle *AH, TocEntry *te)
 
 	setFilePath(AH, fname, tctx->filename);
 
-	ctx->dataFH = cfopen_write(fname, PG_BINARY_W, AH->compression);
+	ctx->dataFH = cfopen_write(fname, PG_BINARY_W, AH->compression, AH->public.dopt->pipeCommand);
 	if (ctx->dataFH == NULL)
 		exit_horribly(modulename, "could not open output file \"%s\": %s\n",
 					  fname, strerror(errno));
@@ -392,7 +392,7 @@ _PrintFileData(ArchiveHandle *AH, char *filename)
 	if (!filename)
 		return;
 
-	cfp = cfopen_read(filename, PG_BINARY_R);
+	cfp = cfopen_read(filename, PG_BINARY_R, AH->public.ropt->pipeCommand);
 
 	if (!cfp)
 		exit_horribly(modulename, "could not open input file \"%s\": %s\n",
@@ -446,7 +446,7 @@ _LoadBlobs(ArchiveHandle *AH)
 
 	setFilePath(AH, fname, "blobs.toc");
 
-	ctx->blobsTocFH = cfopen_read(fname, PG_BINARY_R);
+	ctx->blobsTocFH = cfopen_read(fname, PG_BINARY_R, NULL);
 
 	if (ctx->blobsTocFH == NULL)
 		exit_horribly(modulename, "could not open large object TOC file \"%s\" for input: %s\n",
@@ -579,7 +579,7 @@ _CloseArchive(ArchiveHandle *AH)
 		ctx->pstate = ParallelBackupStart(AH);
 
 		/* The TOC is always created uncompressed */
-		tocFH = cfopen_write(fname, PG_BINARY_W, 0);
+		tocFH = cfopen_write(fname, PG_BINARY_W, 0, NULL);
 		if (tocFH == NULL)
 			exit_horribly(modulename, "could not open output file \"%s\": %s\n",
 						  fname, strerror(errno));
@@ -644,7 +644,7 @@ _StartBlobs(ArchiveHandle *AH, TocEntry *te)
 	setFilePath(AH, fname, "blobs.toc");
 
 	/* The blob TOC file is never compressed */
-	ctx->blobsTocFH = cfopen_write(fname, "ab", 0);
+	ctx->blobsTocFH = cfopen_write(fname, "ab", 0, NULL);
 	if (ctx->blobsTocFH == NULL)
 		exit_horribly(modulename, "could not open output file \"%s\": %s\n",
 					  fname, strerror(errno));
@@ -663,7 +663,7 @@ _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
 
 	snprintf(fname, MAXPGPATH, "%s/blob_%u.dat", ctx->directory, oid);
 
-	ctx->dataFH = cfopen_write(fname, PG_BINARY_W, AH->compression);
+	ctx->dataFH = cfopen_write(fname, PG_BINARY_W, AH->compression, AH->public.dopt->pipeCommand);
 
 	if (ctx->dataFH == NULL)
 		exit_horribly(modulename, "could not open output file \"%s\": %s\n",
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index c8d01ed4a4..5fe9619055 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -378,6 +378,7 @@ main(int argc, char **argv)
 		{"no-subscriptions", no_argument, &dopt.no_subscriptions, 1},
 		{"no-sync", no_argument, NULL, 7},
 		{"on-conflict-do-nothing", no_argument, &dopt.do_nothing, 1},
+		{"pipe", required_argument, NULL, 8},
 
 		{NULL, 0, NULL, 0}
 	};
@@ -562,6 +563,10 @@ main(int argc, char **argv)
 				dosync = false;
 				break;
 
+			case 8:				/* pipe */
+				dopt.pipeCommand = pg_strdup(optarg);
+				break;
+
 			default:
 				fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
 				exit_nicely(1);
@@ -629,11 +634,15 @@ main(int argc, char **argv)
 	if (archiveFormat == archNull)
 		plainText = 1;
 
+	if (dopt.pipeCommand && compressLevel != -1) {
+		exit_horribly(NULL, "option --pipe and --compress cannot be used together\n");
+	}
+
 	/* Custom and directory formats are compressed by default, others not */
 	if (compressLevel == -1)
 	{
 #ifdef HAVE_LIBZ
-		if (archiveFormat == archCustom || archiveFormat == archDirectory)
+		if (!dopt.pipeCommand && !(archiveFormat == archCustom || archiveFormat == archDirectory))
 			compressLevel = Z_DEFAULT_COMPRESSION;
 		else
 #endif
@@ -670,6 +679,10 @@ main(int argc, char **argv)
 	if (archiveFormat != archDirectory && numWorkers > 1)
 		exit_horribly(NULL, "parallel backup only supported by the directory format\n");
 
+	/* Pipe only in the directory archive format so far */
+	if (archiveFormat != archDirectory && dopt.pipeCommand)
+		exit_horribly(NULL, "pipe only supported by the directory format\n");
+
 	/* Open the output file */
 	fout = CreateArchive(filename, archiveFormat, compressLevel, dosync,
 						 archiveMode, setupDumpWorker);
@@ -998,6 +1011,8 @@ help(const char *progname)
 	printf(_("  --use-set-session-authorization\n"
 			 "                               use SET SESSION AUTHORIZATION commands instead of\n"
 			 "                               ALTER OWNER commands to set ownership\n"));
+	printf(_("  --pipe=COMMAND               Create files by piping data to the given command\n"
+			 "                               Only usable with the directory format\n"));
 
 	printf(_("\nConnection options:\n"));
 	printf(_("  -d, --dbname=DBNAME      database to dump\n"));
diff --git a/src/bin/pg_dump/pg_restore.c b/src/bin/pg_dump/pg_restore.c
index 501d7cea72..2f0e7b03d8 100644
--- a/src/bin/pg_dump/pg_restore.c
+++ b/src/bin/pg_dump/pg_restore.c
@@ -124,6 +124,7 @@ main(int argc, char **argv)
 		{"no-publications", no_argument, &no_publications, 1},
 		{"no-security-labels", no_argument, &no_security_labels, 1},
 		{"no-subscriptions", no_argument, &no_subscriptions, 1},
+		{"pipe", required_argument, NULL, 4},
 
 		{NULL, 0, NULL, 0}
 	};
@@ -281,6 +282,10 @@ main(int argc, char **argv)
 				set_dump_section(optarg, &(opts->dumpSections));
 				break;
 
+			case 4:				/* pipe */
+				opts->pipeCommand = pg_strdup(optarg);
+				break;
+
 			default:
 				fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
 				exit_nicely(1);
@@ -496,6 +501,8 @@ usage(const char *progname)
 	printf(_("  --use-set-session-authorization\n"
 			 "                               use SET SESSION AUTHORIZATION commands instead of\n"
 			 "                               ALTER OWNER commands to set ownership\n"));
+	printf(_("  --pipe=COMMAND               Read files using the output from the given command\n"
+			 "                               Only usable with the directory format\n"));
 
 	printf(_("\nConnection options:\n"));
 	printf(_("  -h, --host=HOSTNAME      database server host or socket directory\n"));
-- 
2.17.1

Reply via email to