Hello, I recently wanted a way to encrypt/decrypt backups while still utilizing the parallel dump/restore functionality. I couldn't see a way to do this so I experimented a bit with the directory backup format. If there's in fact already a way to do this, please tell me now :-)
The idea is to add a --pipe option to pg_dump / pg_restore where you can specify a custom shell command that is used to write / read each .dat-file. Usage examples include encryption with pgp and/or custom compression pipelines. %p in the command is expanded to the path to write to / read from. The pipe command is not applied to the toc. The current version is attached. Could something like this be acceptable for inclusion?
From 27f6c541be6546edfef62646f514fe1a92042705 Mon Sep 17 00:00:00 2001 From: David Hedberg <david.hedb...@gmail.com> Date: Sat, 29 Sep 2018 12:55:52 +0200 Subject: [PATCH] Add support for --pipe to pg_dump and pg_restore --- src/bin/pg_dump/compress_io.c | 97 ++++++++++++++++++++++++--- src/bin/pg_dump/compress_io.h | 6 +- src/bin/pg_dump/pg_backup.h | 2 + src/bin/pg_dump/pg_backup_directory.c | 14 ++-- src/bin/pg_dump/pg_dump.c | 17 ++++- src/bin/pg_dump/pg_restore.c | 7 ++ 6 files changed, 121 insertions(+), 22 deletions(-) diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c index a96da15dc1..64c06d7eae 100644 --- a/src/bin/pg_dump/compress_io.c +++ b/src/bin/pg_dump/compress_io.c @@ -443,6 +443,9 @@ struct cfp static int hasSuffix(const char *filename, const char *suffix); #endif +static void +expand_shell_command(char *buf, size_t bufsize, const char *cmd, const char *filepath); + /* free() without changing errno; useful in several places below */ static void free_keep_errno(void *p) @@ -464,24 +467,26 @@ free_keep_errno(void *p) * On failure, return NULL with an error code in errno. */ cfp * -cfopen_read(const char *path, const char *mode) +cfopen_read(const char *path, const char *mode, const char *pipecmd) { cfp *fp; + if (pipecmd) + fp = cfopen(path, mode, 0, pipecmd); #ifdef HAVE_LIBZ - if (hasSuffix(path, ".gz")) - fp = cfopen(path, mode, 1); + else if (hasSuffix(path, ".gz")) + fp = cfopen(path, mode, 1, NULL); else #endif { - fp = cfopen(path, mode, 0); + fp = cfopen(path, mode, 0, NULL); #ifdef HAVE_LIBZ if (fp == NULL) { char *fname; fname = psprintf("%s.gz", path); - fp = cfopen(fname, mode, 1); + fp = cfopen(fname, mode, 1, NULL); free_keep_errno(fname); } #endif @@ -501,19 +506,19 @@ cfopen_read(const char *path, const char *mode) * On failure, return NULL with an error code in errno. */ cfp * -cfopen_write(const char *path, const char *mode, int compression) +cfopen_write(const char *path, const char *mode, int compression, const char *pipecmd) { cfp *fp; if (compression == 0) - fp = cfopen(path, mode, 0); + fp = cfopen(path, mode, 0, pipecmd); else { #ifdef HAVE_LIBZ char *fname; fname = psprintf("%s.gz", path); - fp = cfopen(fname, mode, compression); + fp = cfopen(fname, mode, compression, pipecmd); free_keep_errno(fname); #else exit_horribly(modulename, "not built with zlib support\n"); @@ -530,11 +535,32 @@ cfopen_write(const char *path, const char *mode, int compression) * On failure, return NULL with an error code in errno. */ cfp * -cfopen(const char *path, const char *mode, int compression) +cfopen(const char *path, const char *mode, int compression, const char *pipecmd) { cfp *fp = pg_malloc(sizeof(cfp)); - if (compression != 0) + if (pipecmd) + { + char cmd[MAXPGPATH]; + char pmode[2]; + + if ( !(mode[0] == 'r' || mode[0] == 'w') ) { + exit_horribly(modulename, "Pipe does not support mode %s", mode); + } + pmode[0] = mode[0]; + pmode[1] = '\0'; + + expand_shell_command(cmd, MAXPGPATH, pipecmd, path); + + fp->compressedfp = NULL; + fp->uncompressedfp = popen(cmd, pmode); + if (fp->uncompressedfp == NULL) + { + free_keep_errno(fp); + fp->uncompressedfp = NULL; + } + } + else if (compression != 0) { #ifdef HAVE_LIBZ if (compression != Z_DEFAULT_COMPRESSION) @@ -731,5 +757,54 @@ hasSuffix(const char *filename, const char *suffix) suffix, suffixlen) == 0; } - #endif + +/* + * Expand a shell command + * + * Replaces %p in cmd with the path in filepath and writes the result to buf. + */ +static void +expand_shell_command(char *buf, size_t bufsize, const char *cmd, const char *filepath) +{ + char *dp; + char *endp; + const char *sp; + + dp = buf; + endp = buf + bufsize - 1; + *endp = '\0'; + + for (sp = cmd; *sp; sp++) + { + if (*sp == '%') + { + switch (sp[1]) + { + case 'p': + /* %p: absolute path of file */ + sp++; + strlcpy(dp, filepath, endp - dp); + dp += strlen(dp); + break; + case '%': + /* convert %% to a single % */ + sp++; + if (dp < endp) + *dp++ = *sp; + break; + default: + /* otherwise treat the % as not special */ + if (dp < endp) + *dp++ = *sp; + break; + } + } + else + { + if (dp < endp) + *dp++ = *sp; + } + } + *dp = '\0'; +} diff --git a/src/bin/pg_dump/compress_io.h b/src/bin/pg_dump/compress_io.h index 10fde8bdef..8a09086f96 100644 --- a/src/bin/pg_dump/compress_io.h +++ b/src/bin/pg_dump/compress_io.h @@ -56,9 +56,9 @@ extern void EndCompressor(ArchiveHandle *AH, CompressorState *cs); typedef struct cfp cfp; -extern cfp *cfopen(const char *path, const char *mode, int compression); -extern cfp *cfopen_read(const char *path, const char *mode); -extern cfp *cfopen_write(const char *path, const char *mode, int compression); +extern cfp *cfopen(const char *path, const char *mode, int compression, const char *pipecmd); +extern cfp *cfopen_read(const char *path, const char *mode, const char *pipecmd); +extern cfp *cfopen_write(const char *path, const char *mode, int compression, const char *pipecmd); extern int cfread(void *ptr, int size, cfp *fp); extern int cfwrite(const void *ptr, int size, cfp *fp); extern int cfgetc(cfp *fp); diff --git a/src/bin/pg_dump/pg_backup.h b/src/bin/pg_dump/pg_backup.h index ba798213be..9aba93f923 100644 --- a/src/bin/pg_dump/pg_backup.h +++ b/src/bin/pg_dump/pg_backup.h @@ -87,6 +87,7 @@ typedef struct _restoreOptions int verbose; int aclsSkip; const char *lockWaitTimeout; + const char *pipeCommand; int include_everything; int tocSummary; @@ -141,6 +142,7 @@ typedef struct _dumpOptions int dumpSections; /* bitmask of chosen sections */ bool aclsSkip; const char *lockWaitTimeout; + const char *pipeCommand; /* flags for various command-line long options */ int disable_dollar_quoting; diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c index cda90b9a2a..c9a3f5db52 100644 --- a/src/bin/pg_dump/pg_backup_directory.c +++ b/src/bin/pg_dump/pg_backup_directory.c @@ -205,7 +205,7 @@ InitArchiveFmt_Directory(ArchiveHandle *AH) setFilePath(AH, fname, "toc.dat"); - tocFH = cfopen_read(fname, PG_BINARY_R); + tocFH = cfopen_read(fname, PG_BINARY_R, NULL); if (tocFH == NULL) exit_horribly(modulename, "could not open input file \"%s\": %s\n", @@ -333,7 +333,7 @@ _StartData(ArchiveHandle *AH, TocEntry *te) setFilePath(AH, fname, tctx->filename); - ctx->dataFH = cfopen_write(fname, PG_BINARY_W, AH->compression); + ctx->dataFH = cfopen_write(fname, PG_BINARY_W, AH->compression, AH->public.dopt->pipeCommand); if (ctx->dataFH == NULL) exit_horribly(modulename, "could not open output file \"%s\": %s\n", fname, strerror(errno)); @@ -392,7 +392,7 @@ _PrintFileData(ArchiveHandle *AH, char *filename) if (!filename) return; - cfp = cfopen_read(filename, PG_BINARY_R); + cfp = cfopen_read(filename, PG_BINARY_R, AH->public.ropt->pipeCommand); if (!cfp) exit_horribly(modulename, "could not open input file \"%s\": %s\n", @@ -446,7 +446,7 @@ _LoadBlobs(ArchiveHandle *AH) setFilePath(AH, fname, "blobs.toc"); - ctx->blobsTocFH = cfopen_read(fname, PG_BINARY_R); + ctx->blobsTocFH = cfopen_read(fname, PG_BINARY_R, NULL); if (ctx->blobsTocFH == NULL) exit_horribly(modulename, "could not open large object TOC file \"%s\" for input: %s\n", @@ -579,7 +579,7 @@ _CloseArchive(ArchiveHandle *AH) ctx->pstate = ParallelBackupStart(AH); /* The TOC is always created uncompressed */ - tocFH = cfopen_write(fname, PG_BINARY_W, 0); + tocFH = cfopen_write(fname, PG_BINARY_W, 0, NULL); if (tocFH == NULL) exit_horribly(modulename, "could not open output file \"%s\": %s\n", fname, strerror(errno)); @@ -644,7 +644,7 @@ _StartBlobs(ArchiveHandle *AH, TocEntry *te) setFilePath(AH, fname, "blobs.toc"); /* The blob TOC file is never compressed */ - ctx->blobsTocFH = cfopen_write(fname, "ab", 0); + ctx->blobsTocFH = cfopen_write(fname, "ab", 0, NULL); if (ctx->blobsTocFH == NULL) exit_horribly(modulename, "could not open output file \"%s\": %s\n", fname, strerror(errno)); @@ -663,7 +663,7 @@ _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid) snprintf(fname, MAXPGPATH, "%s/blob_%u.dat", ctx->directory, oid); - ctx->dataFH = cfopen_write(fname, PG_BINARY_W, AH->compression); + ctx->dataFH = cfopen_write(fname, PG_BINARY_W, AH->compression, AH->public.dopt->pipeCommand); if (ctx->dataFH == NULL) exit_horribly(modulename, "could not open output file \"%s\": %s\n", diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index c8d01ed4a4..5fe9619055 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -378,6 +378,7 @@ main(int argc, char **argv) {"no-subscriptions", no_argument, &dopt.no_subscriptions, 1}, {"no-sync", no_argument, NULL, 7}, {"on-conflict-do-nothing", no_argument, &dopt.do_nothing, 1}, + {"pipe", required_argument, NULL, 8}, {NULL, 0, NULL, 0} }; @@ -562,6 +563,10 @@ main(int argc, char **argv) dosync = false; break; + case 8: /* pipe */ + dopt.pipeCommand = pg_strdup(optarg); + break; + default: fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname); exit_nicely(1); @@ -629,11 +634,15 @@ main(int argc, char **argv) if (archiveFormat == archNull) plainText = 1; + if (dopt.pipeCommand && compressLevel != -1) { + exit_horribly(NULL, "option --pipe and --compress cannot be used together\n"); + } + /* Custom and directory formats are compressed by default, others not */ if (compressLevel == -1) { #ifdef HAVE_LIBZ - if (archiveFormat == archCustom || archiveFormat == archDirectory) + if (!dopt.pipeCommand && !(archiveFormat == archCustom || archiveFormat == archDirectory)) compressLevel = Z_DEFAULT_COMPRESSION; else #endif @@ -670,6 +679,10 @@ main(int argc, char **argv) if (archiveFormat != archDirectory && numWorkers > 1) exit_horribly(NULL, "parallel backup only supported by the directory format\n"); + /* Pipe only in the directory archive format so far */ + if (archiveFormat != archDirectory && dopt.pipeCommand) + exit_horribly(NULL, "pipe only supported by the directory format\n"); + /* Open the output file */ fout = CreateArchive(filename, archiveFormat, compressLevel, dosync, archiveMode, setupDumpWorker); @@ -998,6 +1011,8 @@ help(const char *progname) printf(_(" --use-set-session-authorization\n" " use SET SESSION AUTHORIZATION commands instead of\n" " ALTER OWNER commands to set ownership\n")); + printf(_(" --pipe=COMMAND Create files by piping data to the given command\n" + " Only usable with the directory format\n")); printf(_("\nConnection options:\n")); printf(_(" -d, --dbname=DBNAME database to dump\n")); diff --git a/src/bin/pg_dump/pg_restore.c b/src/bin/pg_dump/pg_restore.c index 501d7cea72..2f0e7b03d8 100644 --- a/src/bin/pg_dump/pg_restore.c +++ b/src/bin/pg_dump/pg_restore.c @@ -124,6 +124,7 @@ main(int argc, char **argv) {"no-publications", no_argument, &no_publications, 1}, {"no-security-labels", no_argument, &no_security_labels, 1}, {"no-subscriptions", no_argument, &no_subscriptions, 1}, + {"pipe", required_argument, NULL, 4}, {NULL, 0, NULL, 0} }; @@ -281,6 +282,10 @@ main(int argc, char **argv) set_dump_section(optarg, &(opts->dumpSections)); break; + case 4: /* pipe */ + opts->pipeCommand = pg_strdup(optarg); + break; + default: fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname); exit_nicely(1); @@ -496,6 +501,8 @@ usage(const char *progname) printf(_(" --use-set-session-authorization\n" " use SET SESSION AUTHORIZATION commands instead of\n" " ALTER OWNER commands to set ownership\n")); + printf(_(" --pipe=COMMAND Read files using the output from the given command\n" + " Only usable with the directory format\n")); printf(_("\nConnection options:\n")); printf(_(" -h, --host=HOSTNAME database server host or socket directory\n")); -- 2.17.1