On Thu, Sep 1, 2016 at 2:39 PM, Michael Paquier <michael.paqu...@gmail.com>
wrote:

> On Thu, Sep 1, 2016 at 5:13 PM, Magnus Hagander <mag...@hagander.net>
> wrote:
> > We don't seem to check for similar issues as the one just found in the
> > existing tests though, do we? As in, we don't actually verify that the
> xlog
> > files being streamed are 16Mb? (Or for that matter that the tarfile
> emitted
> > by -Ft is actually a tarfile?) Or am I missing some magic somewhere? :)
>
> No. There is no checks on the WAL file size (you should use the output
> of pg_controldata to see how large the segments should be). For the
> tar file, the complication is in its untar... Perl provides some ways
> to untar things, though the oldest version that we support in the TAP
> tests does not offer that :(
>

Ugh. That would be nice to have, but I think that's outside the scope of
this patch.

PFA is an updated version of this patch that:
* documents a magic value passed to zlib (which is in their documentation
as being a magic value, but has no define)
* fixes the padding of tar files
* adds a most basic test that the -X stream -Ft does produce a tarfile

As for using XLOGDIR to drive the name of the tarfile. pg_basebackup is
already hardcoded to use pg_xlog. And so are the tests. We probably want to
fix that, but that's a separate step and this patch will be easier to
review and test if we keep it out for now.

-- 
 Magnus Hagander
 Me: http://www.hagander.net/
 Work: http://www.redpill-linpro.com/
diff --git a/doc/src/sgml/ref/pg_basebackup.sgml b/doc/src/sgml/ref/pg_basebackup.sgml
index 03615da..981d201 100644
--- a/doc/src/sgml/ref/pg_basebackup.sgml
+++ b/doc/src/sgml/ref/pg_basebackup.sgml
@@ -180,7 +180,8 @@ PostgreSQL documentation
             target directory, the tar contents will be written to
             standard output, suitable for piping to for example
             <productname>gzip</productname>. This is only possible if
-            the cluster has no additional tablespaces.
+            the cluster has no additional tablespaces and transaction
+            log streaming is not used.
            </para>
            </listitem>
          </varlistentry>
@@ -323,6 +324,10 @@ PostgreSQL documentation
              If the log has been rotated when it's time to transfer it, the
              backup will fail and be unusable.
            </para>
+           <para>
+            The transaction log files will be written to
+             the <filename>base.tar</filename> file.
+           </para>
           </listitem>
          </varlistentry>
 
@@ -339,6 +344,9 @@ PostgreSQL documentation
              client can keep up with transaction log received, using this mode
              requires no extra transaction logs to be saved on the master.
            </para>
+           <para>The transactionn log files are written to a separate file
+            called <filename>pg_xlog.tar</filename>.
+           </para>
           </listitem>
          </varlistentry>
         </variablelist>
diff --git a/src/bin/pg_basebackup/Makefile b/src/bin/pg_basebackup/Makefile
index fa1ce8b..52ac9e9 100644
--- a/src/bin/pg_basebackup/Makefile
+++ b/src/bin/pg_basebackup/Makefile
@@ -19,7 +19,7 @@ include $(top_builddir)/src/Makefile.global
 override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
 LDFLAGS += -L$(top_builddir)/src/fe_utils -lpgfeutils -lpq
 
-OBJS=receivelog.o streamutil.o $(WIN32RES)
+OBJS=receivelog.o streamutil.o walmethods.o $(WIN32RES)
 
 all: pg_basebackup pg_receivexlog pg_recvlogical
 
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index 351a420..58c0821 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -365,7 +365,7 @@ typedef struct
 {
 	PGconn	   *bgconn;
 	XLogRecPtr	startptr;
-	char		xlogdir[MAXPGPATH];
+	char		xlog[MAXPGPATH];	/* directory or tarfile depending on mode */
 	char	   *sysidentifier;
 	int			timeline;
 } logstreamer_param;
@@ -383,9 +383,13 @@ LogStreamerMain(logstreamer_param *param)
 	stream.standby_message_timeout = standby_message_timeout;
 	stream.synchronous = false;
 	stream.mark_done = true;
-	stream.basedir = param->xlogdir;
 	stream.partial_suffix = NULL;
 
+	if (format == 'p')
+		stream.walmethod = CreateWalDirectoryMethod(param->xlog);
+	else
+		stream.walmethod = CreateWalTarMethod(param->xlog, compresslevel);
+
 	if (!ReceiveXlogStream(param->bgconn, &stream))
 
 		/*
@@ -395,6 +399,14 @@ LogStreamerMain(logstreamer_param *param)
 		 */
 		return 1;
 
+	if (!stream.walmethod->finish())
+	{
+		fprintf(stderr,
+				_("%s: could not finish writing WAL files: %s\n"),
+				progname, strerror(errno));
+		return 1;
+	}
+
 	PQfinish(param->bgconn);
 	return 0;
 }
@@ -445,22 +457,25 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier)
 		/* Error message already written in GetConnection() */
 		exit(1);
 
-	snprintf(param->xlogdir, sizeof(param->xlogdir), "%s/pg_xlog", basedir);
-
-	/*
-	 * Create pg_xlog/archive_status (and thus pg_xlog) so we can write to
-	 * basedir/pg_xlog as the directory entry in the tar file may arrive
-	 * later.
-	 */
-	snprintf(statusdir, sizeof(statusdir), "%s/pg_xlog/archive_status",
-			 basedir);
+	snprintf(param->xlog, sizeof(param->xlog), "%s/pg_xlog", basedir);
 
-	if (pg_mkdir_p(statusdir, S_IRWXU) != 0 && errno != EEXIST)
+	if (format == 'p')
 	{
-		fprintf(stderr,
-				_("%s: could not create directory \"%s\": %s\n"),
-				progname, statusdir, strerror(errno));
-		disconnect_and_exit(1);
+		/*
+		 * Create pg_xlog/archive_status (and thus pg_xlog) so we can write to
+		 * basedir/pg_xlog as the directory entry in the tar file may arrive
+		 * later.
+		 */
+		snprintf(statusdir, sizeof(statusdir), "%s/pg_xlog/archive_status",
+				 basedir);
+
+		if (pg_mkdir_p(statusdir, S_IRWXU) != 0 && errno != EEXIST)
+		{
+			fprintf(stderr,
+					_("%s: could not create directory \"%s\": %s\n"),
+					progname, statusdir, strerror(errno));
+			disconnect_and_exit(1);
+		}
 	}
 
 	/*
@@ -2110,16 +2125,6 @@ main(int argc, char **argv)
 		exit(1);
 	}
 
-	if (format != 'p' && streamwal)
-	{
-		fprintf(stderr,
-				_("%s: WAL streaming can only be used in plain mode\n"),
-				progname);
-		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
-				progname);
-		exit(1);
-	}
-
 	if (replication_slot && !streamwal)
 	{
 		fprintf(stderr,
diff --git a/src/bin/pg_basebackup/pg_receivexlog.c b/src/bin/pg_basebackup/pg_receivexlog.c
index 7f7ee9d..9b4c101 100644
--- a/src/bin/pg_basebackup/pg_receivexlog.c
+++ b/src/bin/pg_basebackup/pg_receivexlog.c
@@ -337,11 +337,19 @@ StreamLog(void)
 	stream.standby_message_timeout = standby_message_timeout;
 	stream.synchronous = synchronous;
 	stream.mark_done = false;
-	stream.basedir = basedir;
+	stream.walmethod = CreateWalDirectoryMethod(basedir);
 	stream.partial_suffix = ".partial";
 
 	ReceiveXlogStream(conn, &stream);
 
+	if (!stream.walmethod->finish())
+	{
+		fprintf(stderr,
+				_("%s: could not finish writing WAL files: %s\n"),
+				progname, strerror(errno));
+		return;
+	}
+
 	PQfinish(conn);
 	conn = NULL;
 }
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
index 062730b..9197eeb 100644
--- a/src/bin/pg_basebackup/receivelog.c
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -26,7 +26,7 @@
 
 
 /* fd and filename for currently open WAL file */
-static int	walfile = -1;
+static Walfile *walfile = NULL;
 static char current_walfile_name[MAXPGPATH] = "";
 static bool reportFlushPosition = false;
 static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
@@ -37,7 +37,7 @@ static PGresult *HandleCopyStream(PGconn *conn, StreamCtl *stream,
 				 XLogRecPtr *stoppos);
 static int	CopyStreamPoll(PGconn *conn, long timeout_ms);
 static int	CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
-static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
+static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
 					XLogRecPtr blockpos, int64 *last_status);
 static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
 				   XLogRecPtr *blockpos);
@@ -52,33 +52,33 @@ static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
 						 uint32 *timeline);
 
 static bool
-mark_file_as_archived(const char *basedir, const char *fname)
+mark_file_as_archived(StreamCtl *stream, const char *fname)
 {
-	int			fd;
+	Walfile    *f;
 	static char tmppath[MAXPGPATH];
 
-	snprintf(tmppath, sizeof(tmppath), "%s/archive_status/%s.done",
-			 basedir, fname);
+	snprintf(tmppath, sizeof(tmppath), "archive_status/%s.done",
+			 fname);
 
-	fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
-	if (fd < 0)
+	f = stream->walmethod->open_for_write(tmppath, NULL, 0);
+	if (f == NULL)
 	{
 		fprintf(stderr, _("%s: could not create archive status file \"%s\": %s\n"),
-				progname, tmppath, strerror(errno));
+				progname, tmppath, stream->walmethod->getlasterror());
 		return false;
 	}
 
-	if (fsync(fd) != 0)
+	if (stream->walmethod->fsync(f) != 0)
 	{
 		fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
-				progname, tmppath, strerror(errno));
+				progname, tmppath, stream->walmethod->getlasterror());
 
-		close(fd);
+		stream->walmethod->close(f, CLOSE_UNLINK);
 
 		return false;
 	}
 
-	close(fd);
+	stream->walmethod->close(f, CLOSE_NORMAL);
 
 	return true;
 }
@@ -92,79 +92,65 @@ mark_file_as_archived(const char *basedir, const char *fname)
 static bool
 open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
 {
-	int			f;
+	Walfile    *f;
 	char		fn[MAXPGPATH];
-	struct stat statbuf;
-	char	   *zerobuf;
-	int			bytes;
+	ssize_t		size;
 	XLogSegNo	segno;
 
 	XLByteToSeg(startpoint, segno);
 	XLogFileName(current_walfile_name, stream->timeline, segno);
 
-	snprintf(fn, sizeof(fn), "%s/%s%s", stream->basedir, current_walfile_name,
+	snprintf(fn, sizeof(fn), "%s%s", current_walfile_name,
 			 stream->partial_suffix ? stream->partial_suffix : "");
-	f = open(fn, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
-	if (f == -1)
-	{
-		fprintf(stderr,
-				_("%s: could not open transaction log file \"%s\": %s\n"),
-				progname, fn, strerror(errno));
-		return false;
-	}
-
-	/*
-	 * Verify that the file is either empty (just created), or a complete
-	 * XLogSegSize segment. Anything in between indicates a corrupt file.
-	 */
-	if (fstat(f, &statbuf) != 0)
-	{
-		fprintf(stderr,
-				_("%s: could not stat transaction log file \"%s\": %s\n"),
-				progname, fn, strerror(errno));
-		close(f);
-		return false;
-	}
-	if (statbuf.st_size == XLogSegSize)
-	{
-		/* File is open and ready to use */
-		walfile = f;
-		return true;
-	}
-	if (statbuf.st_size != 0)
-	{
-		fprintf(stderr,
-				_("%s: transaction log file \"%s\" has %d bytes, should be 0 or %d\n"),
-				progname, fn, (int) statbuf.st_size, XLogSegSize);
-		close(f);
-		return false;
-	}
 
-	/* New, empty, file. So pad it to 16Mb with zeroes */
-	zerobuf = pg_malloc0(XLOG_BLCKSZ);
-	for (bytes = 0; bytes < XLogSegSize; bytes += XLOG_BLCKSZ)
+	if (stream->walmethod->existsfile(fn))
 	{
-		if (write(f, zerobuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
+		/*
+		 * Verify that the file is either empty (just created), or a complete
+		 * XLogSegSize segment. Anything in between indicates a corrupt file.
+		 */
+		size = stream->walmethod->get_file_size(fn);
+		if (size < 0)
+		{
+			fprintf(stderr,
+			_("%s: could not get size of transaction log file \"%s\": %s\n"),
+					progname, fn, stream->walmethod->getlasterror());
+			return false;
+		}
+		if (size == XLogSegSize)
+		{
+			/* Already padded file. Open it for use */
+			f = stream->walmethod->open_for_write(current_walfile_name, stream->partial_suffix, 0);
+			if (f == NULL)
+			{
+				fprintf(stderr,
+						_("%s: could not open existing transaction log file \"%s\": %s\n"),
+						progname, fn, stream->walmethod->getlasterror());
+				return false;
+			}
+			walfile = f;
+			return true;
+		}
+		if (size != 0)
 		{
 			fprintf(stderr,
-					_("%s: could not pad transaction log file \"%s\": %s\n"),
-					progname, fn, strerror(errno));
-			free(zerobuf);
-			close(f);
-			unlink(fn);
+					_("%s: transaction log file \"%s\" has %d bytes, should be 0 or %d\n"),
+					progname, fn, (int) size, XLogSegSize);
 			return false;
 		}
 	}
-	free(zerobuf);
 
-	if (lseek(f, SEEK_SET, 0) != 0)
+	/* No file existed, so create one */
+
+	f = stream->walmethod->open_for_write(current_walfile_name, stream->partial_suffix, XLogSegSize);
+	if (f == NULL)
 	{
 		fprintf(stderr,
-				_("%s: could not seek to beginning of transaction log file \"%s\": %s\n"),
-				progname, fn, strerror(errno));
-		close(f);
+				_("%s: could not open transaction log file \"%s\": %s\n"),
+				progname, fn, stream->walmethod->getlasterror());
 		return false;
 	}
+
 	walfile = f;
 	return true;
 }
@@ -178,56 +164,50 @@ static bool
 close_walfile(StreamCtl *stream, XLogRecPtr pos)
 {
 	off_t		currpos;
+	int			r;
 
-	if (walfile == -1)
+	if (walfile == NULL)
 		return true;
 
-	currpos = lseek(walfile, 0, SEEK_CUR);
+	currpos = stream->walmethod->get_current_pos(walfile);
 	if (currpos == -1)
 	{
 		fprintf(stderr,
 			 _("%s: could not determine seek position in file \"%s\": %s\n"),
-				progname, current_walfile_name, strerror(errno));
+		  progname, current_walfile_name, stream->walmethod->getlasterror());
 		return false;
 	}
 
-	if (fsync(walfile) != 0)
+	if (stream->walmethod->fsync(walfile) != 0)
 	{
 		fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
-				progname, current_walfile_name, strerror(errno));
+		  progname, current_walfile_name, stream->walmethod->getlasterror());
 		return false;
 	}
 
-	if (close(walfile) != 0)
+	if (stream->partial_suffix)
 	{
-		fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
-				progname, current_walfile_name, strerror(errno));
-		walfile = -1;
-		return false;
+		if (currpos == XLOG_SEG_SIZE)
+			r = stream->walmethod->close(walfile, CLOSE_NORMAL);
+		else
+		{
+			fprintf(stderr,
+					_("%s: not renaming \"%s%s\", segment is not complete\n"),
+					progname, current_walfile_name, stream->partial_suffix);
+			r = stream->walmethod->close(walfile, CLOSE_NO_RENAME);
+		}
 	}
-	walfile = -1;
+	else
+		r = stream->walmethod->close(walfile, CLOSE_NORMAL);
 
-	/*
-	 * If we finished writing a .partial file, rename it into place.
-	 */
-	if (currpos == XLOG_SEG_SIZE && stream->partial_suffix)
-	{
-		char		oldfn[MAXPGPATH];
-		char		newfn[MAXPGPATH];
+	walfile = NULL;
 
-		snprintf(oldfn, sizeof(oldfn), "%s/%s%s", stream->basedir, current_walfile_name, stream->partial_suffix);
-		snprintf(newfn, sizeof(newfn), "%s/%s", stream->basedir, current_walfile_name);
-		if (rename(oldfn, newfn) != 0)
-		{
-			fprintf(stderr, _("%s: could not rename file \"%s\": %s\n"),
-					progname, current_walfile_name, strerror(errno));
-			return false;
-		}
+	if (r != 0)
+	{
+		fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
+		  progname, current_walfile_name, stream->walmethod->getlasterror());
+		return false;
 	}
-	else if (stream->partial_suffix)
-		fprintf(stderr,
-				_("%s: not renaming \"%s%s\", segment is not complete\n"),
-				progname, current_walfile_name, stream->partial_suffix);
 
 	/*
 	 * Mark file as archived if requested by the caller - pg_basebackup needs
@@ -238,7 +218,7 @@ close_walfile(StreamCtl *stream, XLogRecPtr pos)
 	if (currpos == XLOG_SEG_SIZE && stream->mark_done)
 	{
 		/* writes error message if failed */
-		if (!mark_file_as_archived(stream->basedir, current_walfile_name))
+		if (!mark_file_as_archived(stream, current_walfile_name))
 			return false;
 	}
 
@@ -253,9 +233,7 @@ close_walfile(StreamCtl *stream, XLogRecPtr pos)
 static bool
 existsTimeLineHistoryFile(StreamCtl *stream)
 {
-	char		path[MAXPGPATH];
 	char		histfname[MAXFNAMELEN];
-	int			fd;
 
 	/*
 	 * Timeline 1 never has a history file. We treat that as if it existed,
@@ -266,31 +244,16 @@ existsTimeLineHistoryFile(StreamCtl *stream)
 
 	TLHistoryFileName(histfname, stream->timeline);
 
-	snprintf(path, sizeof(path), "%s/%s", stream->basedir, histfname);
-
-	fd = open(path, O_RDONLY | PG_BINARY, 0);
-	if (fd < 0)
-	{
-		if (errno != ENOENT)
-			fprintf(stderr, _("%s: could not open timeline history file \"%s\": %s\n"),
-					progname, path, strerror(errno));
-		return false;
-	}
-	else
-	{
-		close(fd);
-		return true;
-	}
+	return stream->walmethod->existsfile(histfname);
 }
 
 static bool
 writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
 {
 	int			size = strlen(content);
-	char		path[MAXPGPATH];
 	char		tmppath[MAXPGPATH];
 	char		histfname[MAXFNAMELEN];
-	int			fd;
+	Walfile    *f;
 
 	/*
 	 * Check that the server's idea of how timeline history files should be
@@ -304,62 +267,39 @@ writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
 		return false;
 	}
 
-	snprintf(path, sizeof(path), "%s/%s", stream->basedir, histfname);
-
-	/*
-	 * Write into a temp file name.
-	 */
-	snprintf(tmppath, MAXPGPATH, "%s.tmp", path);
-
-	unlink(tmppath);
-
-	fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
-	if (fd < 0)
+	f = stream->walmethod->open_for_write(histfname, ".tmp", 0);
+	if (f == NULL)
 	{
 		fprintf(stderr, _("%s: could not create timeline history file \"%s\": %s\n"),
-				progname, tmppath, strerror(errno));
+				progname, histfname, stream->walmethod->getlasterror());
 		return false;
 	}
 
-	errno = 0;
-	if ((int) write(fd, content, size) != size)
+	if ((int) stream->walmethod->write(f, content, size) != size)
 	{
-		int			save_errno = errno;
+		fprintf(stderr, _("%s: could not write timeline history file \"%s\": %s\n"),
+				progname, histfname, stream->walmethod->getlasterror());
 
 		/*
 		 * If we fail to make the file, delete it to release disk space
 		 */
-		close(fd);
-		unlink(tmppath);
-		errno = save_errno;
+		stream->walmethod->close(f, CLOSE_UNLINK);
 
-		fprintf(stderr, _("%s: could not write timeline history file \"%s\": %s\n"),
-				progname, tmppath, strerror(errno));
 		return false;
 	}
 
-	if (fsync(fd) != 0)
+	if (stream->walmethod->fsync(f) != 0)
 	{
-		close(fd);
 		fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
-				progname, tmppath, strerror(errno));
+				progname, tmppath, stream->walmethod->getlasterror());
+		stream->walmethod->close(f, CLOSE_NORMAL);
 		return false;
 	}
 
-	if (close(fd) != 0)
+	if (stream->walmethod->close(f, CLOSE_NORMAL) != 0)
 	{
 		fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
-				progname, tmppath, strerror(errno));
-		return false;
-	}
-
-	/*
-	 * Now move the completed history file into place with its final name.
-	 */
-	if (rename(tmppath, path) < 0)
-	{
-		fprintf(stderr, _("%s: could not rename file \"%s\" to \"%s\": %s\n"),
-				progname, tmppath, path, strerror(errno));
+				progname, histfname, stream->walmethod->getlasterror());
 		return false;
 	}
 
@@ -367,7 +307,7 @@ writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
 	if (stream->mark_done)
 	{
 		/* writes error message if failed */
-		if (!mark_file_as_archived(stream->basedir, histfname))
+		if (!mark_file_as_archived(stream, histfname))
 			return false;
 	}
 
@@ -736,10 +676,10 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
 	}
 
 error:
-	if (walfile != -1 && close(walfile) != 0)
+	if (walfile != NULL && stream->walmethod->close(walfile, CLOSE_NORMAL) != 0)
 		fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
-				progname, current_walfile_name, strerror(errno));
-	walfile = -1;
+		  progname, current_walfile_name, stream->walmethod->getlasterror());
+	walfile = NULL;
 	return false;
 }
 
@@ -823,12 +763,12 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream,
 		 * If synchronous option is true, issue sync command as soon as there
 		 * are WAL data which has not been flushed yet.
 		 */
-		if (stream->synchronous && lastFlushPosition < blockpos && walfile != -1)
+		if (stream->synchronous && lastFlushPosition < blockpos && walfile != NULL)
 		{
-			if (fsync(walfile) != 0)
+			if (stream->walmethod->fsync(walfile) != 0)
 			{
 				fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
-						progname, current_walfile_name, strerror(errno));
+						progname, current_walfile_name, stream->walmethod->getlasterror());
 				goto error;
 			}
 			lastFlushPosition = blockpos;
@@ -879,7 +819,7 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream,
 			/* Check the message type. */
 			if (copybuf[0] == 'k')
 			{
-				if (!ProcessKeepaliveMsg(conn, copybuf, r, blockpos,
+				if (!ProcessKeepaliveMsg(conn, stream, copybuf, r, blockpos,
 										 &last_status))
 					goto error;
 			}
@@ -1032,7 +972,7 @@ CopyStreamReceive(PGconn *conn, long timeout, char **buffer)
  * Process the keepalive message.
  */
 static bool
-ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
+ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
 					XLogRecPtr blockpos, int64 *last_status)
 {
 	int			pos;
@@ -1059,7 +999,7 @@ ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
 	if (replyRequested && still_sending)
 	{
 		if (reportFlushPosition && lastFlushPosition < blockpos &&
-			walfile != -1)
+			walfile != NULL)
 		{
 			/*
 			 * If a valid flush location needs to be reported, flush the
@@ -1068,10 +1008,10 @@ ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
 			 * data has been successfully replicated or not, at the normal
 			 * shutdown of the server.
 			 */
-			if (fsync(walfile) != 0)
+			if (stream->walmethod->fsync(walfile) != 0)
 			{
 				fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
-						progname, current_walfile_name, strerror(errno));
+						progname, current_walfile_name, stream->walmethod->getlasterror());
 				return false;
 			}
 			lastFlushPosition = blockpos;
@@ -1129,7 +1069,7 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
 	 * Verify that the initial location in the stream matches where we think
 	 * we are.
 	 */
-	if (walfile == -1)
+	if (walfile == NULL)
 	{
 		/* No file open yet */
 		if (xlogoff != 0)
@@ -1143,12 +1083,11 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
 	else
 	{
 		/* More data in existing segment */
-		/* XXX: store seek value don't reseek all the time */
-		if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
+		if (stream->walmethod->get_current_pos(walfile) != xlogoff)
 		{
 			fprintf(stderr,
 					_("%s: got WAL data offset %08x, expected %08x\n"),
-					progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
+					progname, xlogoff, (int) stream->walmethod->get_current_pos(walfile));
 			return false;
 		}
 	}
@@ -1169,7 +1108,7 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
 		else
 			bytes_to_write = bytes_left;
 
-		if (walfile == -1)
+		if (walfile == NULL)
 		{
 			if (!open_walfile(stream, *blockpos))
 			{
@@ -1178,14 +1117,13 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
 			}
 		}
 
-		if (write(walfile,
-				  copybuf + hdr_len + bytes_written,
-				  bytes_to_write) != bytes_to_write)
+		if (stream->walmethod->write(walfile, copybuf + hdr_len + bytes_written,
+									 bytes_to_write) != bytes_to_write)
 		{
 			fprintf(stderr,
 				  _("%s: could not write %u bytes to WAL file \"%s\": %s\n"),
 					progname, bytes_to_write, current_walfile_name,
-					strerror(errno));
+					stream->walmethod->getlasterror());
 			return false;
 		}
 
diff --git a/src/bin/pg_basebackup/receivelog.h b/src/bin/pg_basebackup/receivelog.h
index 554ff8b..e6db14a 100644
--- a/src/bin/pg_basebackup/receivelog.h
+++ b/src/bin/pg_basebackup/receivelog.h
@@ -13,6 +13,7 @@
 #define RECEIVELOG_H
 
 #include "libpq-fe.h"
+#include "walmethods.h"
 
 #include "access/xlogdefs.h"
 
@@ -39,7 +40,7 @@ typedef struct StreamCtl
 
 	stream_stop_callback stream_stop;	/* Stop streaming when returns true */
 
-	char	   *basedir;		/* Received segments written to this dir */
+	WalWriteMethod *walmethod;	/* How to write the WAL */
 	char	   *partial_suffix; /* Suffix appended to partially received files */
 } StreamCtl;
 
diff --git a/src/bin/pg_basebackup/t/010_pg_basebackup.pl b/src/bin/pg_basebackup/t/010_pg_basebackup.pl
index 6c33936..797076d 100644
--- a/src/bin/pg_basebackup/t/010_pg_basebackup.pl
+++ b/src/bin/pg_basebackup/t/010_pg_basebackup.pl
@@ -4,7 +4,7 @@ use Cwd;
 use Config;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 51;
+use Test::More tests => 53;
 
 program_help_ok('pg_basebackup');
 program_version_ok('pg_basebackup');
@@ -189,6 +189,10 @@ $node->command_ok(
 	'pg_basebackup -X stream runs');
 ok(grep(/^[0-9A-F]{24}$/, slurp_dir("$tempdir/backupxf/pg_xlog")),
 	'WAL files copied');
+$node->command_ok(
+	[ 'pg_basebackup', '-D', "$tempdir/backupxst", '-X', 'stream', '-Ft' ],
+	'pg_basebackup -X stream runs in tar mode');
+ok(-f "$tempdir/backupxst/pg_xlog.tar");
 
 $node->command_fails(
 	[ 'pg_basebackup', '-D', "$tempdir/fail", '-S', 'slot1' ],
diff --git a/src/bin/pg_basebackup/walmethods.c b/src/bin/pg_basebackup/walmethods.c
new file mode 100644
index 0000000..7a5c6e3
--- /dev/null
+++ b/src/bin/pg_basebackup/walmethods.c
@@ -0,0 +1,838 @@
+/*-------------------------------------------------------------------------
+ *
+ * walmethods.c - implementations of different ways to write received wal
+ *
+ * NOTE! The caller must ensure that only one method is instantiated in
+ *		 any given program, and that it's only instantiated once!
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		  src/bin/pg_basebackup/walmethods.c
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres_fe.h"
+
+#include <sys/stat.h>
+#include <time.h>
+#include <unistd.h>
+#ifdef HAVE_LIBZ
+#include <zlib.h>
+#endif
+
+#include "pgtar.h"
+
+#include "receivelog.h"
+
+/* Size of zlib buffer for .tar.gz */
+#define ZLIB_OUT_SIZE 4096
+
+/*-------------------------------------------------------------------------
+ * WalDirectoryMethod - write wal to a directory looking like pg_xlog
+ *-------------------------------------------------------------------------
+ */
+
+/*
+ * Global static data for this method
+ */
+typedef struct DirectoryMethodData
+{
+	char	   *basedir;
+}	DirectoryMethodData;
+static DirectoryMethodData *dir_data = NULL;
+
+/*
+ * Local file handle
+ */
+typedef struct DirectoryMethodFile
+{
+	int			fd;
+	off_t		currpos;
+	char	   *pathname;
+	char	   *temp_suffix;
+}	DirectoryMethodFile;
+
+static char *
+dir_getlasterror(void)
+{
+	/* Directory method always sets errno, so just use strerror */
+	return strerror(errno);
+}
+
+static Walfile
+dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
+{
+	static char tmppath[MAXPGPATH];
+	int			fd;
+	DirectoryMethodFile *f;
+
+	snprintf(tmppath, sizeof(tmppath), "%s/%s%s",
+			 dir_data->basedir, pathname, temp_suffix ? temp_suffix : "");
+
+	fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
+	if (fd < 0)
+		return NULL;
+
+	if (pad_to_size)
+	{
+		/* Always pre-pad on regular files */
+		char	   *zerobuf;
+		int			bytes;
+
+		zerobuf = pg_malloc0(XLOG_BLCKSZ);
+		for (bytes = 0; bytes < pad_to_size; bytes += XLOG_BLCKSZ)
+		{
+			if (write(fd, zerobuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
+			{
+				int			save_errno = errno;
+
+				pg_free(zerobuf);
+				close(fd);
+				errno = save_errno;
+				return NULL;
+			}
+		}
+		pg_free(zerobuf);
+
+		if (lseek(fd, 0, SEEK_SET) != 0)
+			return NULL;
+	}
+
+	f = pg_malloc0(sizeof(DirectoryMethodFile));
+	f->fd = fd;
+	f->currpos = 0;
+	f->pathname = pg_strdup(pathname);
+	if (temp_suffix)
+		f->temp_suffix = pg_strdup(temp_suffix);
+	return f;
+}
+
+static ssize_t
+dir_write(Walfile f, const void *buf, size_t count)
+{
+	ssize_t		r;
+	DirectoryMethodFile *df = (DirectoryMethodFile *) f;
+
+	Assert(f != NULL);
+
+	r = write(df->fd, buf, count);
+	if (r > 0)
+		df->currpos += r;
+	return r;
+}
+
+static off_t
+dir_get_current_pos(Walfile f)
+{
+	Assert(f != NULL);
+
+	/* Use a cached value to prevent lots of reseeks */
+	return ((DirectoryMethodFile *) f)->currpos;
+}
+
+static int
+dir_close(Walfile f, WalCloseMethod method)
+{
+	int			r;
+	DirectoryMethodFile *df = (DirectoryMethodFile *) f;
+	static char tmppath[MAXPGPATH];
+	static char tmppath2[MAXPGPATH];
+
+	Assert(f != NULL);
+
+	r = close(df->fd);
+
+	if (r == 0)
+	{
+		/* Build path to the current version of the file */
+		if (method == CLOSE_NORMAL && df->temp_suffix)
+		{
+			/* If we have a temp prefix, normal is we rename the file */
+			snprintf(tmppath, sizeof(tmppath), "%s/%s%s",
+					 dir_data->basedir, df->pathname, df->temp_suffix);
+			snprintf(tmppath2, sizeof(tmppath2), "%s/%s",
+					 dir_data->basedir, df->pathname);
+			r = rename(tmppath, tmppath2);
+		}
+		else if (method == CLOSE_UNLINK)
+		{
+			/* Unlink the file once it's closed */
+			snprintf(tmppath, sizeof(tmppath), "%s/%s%s",
+					 dir_data->basedir, df->pathname, df->temp_suffix ? df->temp_suffix : "");
+			r = unlink(tmppath);
+		}
+		/* else either CLOSE_NORMAL and no temp suffix, or CLOSE_NO_RENAME */
+	}
+
+	pg_free(df->pathname);
+	if (df->temp_suffix)
+		pg_free(df->temp_suffix);
+	pg_free(df);
+
+	return r;
+}
+
+static int
+dir_fsync(Walfile f)
+{
+	Assert(f != NULL);
+
+	return fsync(((DirectoryMethodFile *) f)->fd);
+}
+
+static ssize_t
+dir_get_file_size(const char *pathname)
+{
+	struct stat statbuf;
+	static char tmppath[MAXPGPATH];
+
+	snprintf(tmppath, sizeof(tmppath), "%s/%s",
+			 dir_data->basedir, pathname);
+
+	if (stat(tmppath, &statbuf) != 0)
+		return -1;
+
+	return statbuf.st_size;
+}
+
+static int
+dir_unlink(const char *pathname)
+{
+	static char tmppath[MAXPGPATH];
+
+	snprintf(tmppath, sizeof(tmppath), "%s/%s",
+			 dir_data->basedir, pathname);
+
+	return unlink(tmppath);
+}
+
+static bool
+dir_existsfile(const char *pathname)
+{
+	static char tmppath[MAXPGPATH];
+	int			fd;
+
+	snprintf(tmppath, sizeof(tmppath), "%s/%s",
+			 dir_data->basedir, pathname);
+
+	fd = open(tmppath, O_RDONLY | PG_BINARY, 0);
+	if (fd < 0)
+		return false;
+	close(fd);
+	return true;
+}
+
+static bool
+dir_finish(void)
+{
+	/* No cleanup necessary */
+	return true;
+}
+
+
+WalWriteMethod *
+CreateWalDirectoryMethod(const char *basedir)
+{
+	WalWriteMethod *method;
+
+	method = pg_malloc0(sizeof(WalWriteMethod));
+	method->open_for_write = dir_open_for_write;
+	method->write = dir_write;
+	method->get_current_pos = dir_get_current_pos;
+	method->get_file_size = dir_get_file_size;
+	method->close = dir_close;
+	method->fsync = dir_fsync;
+	method->unlink = dir_unlink;
+	method->existsfile = dir_existsfile;
+	method->finish = dir_finish;
+	method->getlasterror = dir_getlasterror;
+
+	dir_data = pg_malloc0(sizeof(DirectoryMethodData));
+	dir_data->basedir = pg_strdup(basedir);
+
+	return method;
+}
+
+
+/*-------------------------------------------------------------------------
+ * WalTarMethod - write wal to a tar file containing pg_xlog contents
+ *-------------------------------------------------------------------------
+ */
+
+typedef struct TarMethodFile
+{
+	off_t		ofs_start;		/* Where does the *header* for this file start */
+	off_t		currpos;
+	char		header[512];
+	char	   *pathname;
+	size_t		pad_to_size;
+}	TarMethodFile;
+
+typedef struct TarMethodData
+{
+	char	   *tarfilename;
+	int			fd;
+	int			compression;
+	TarMethodFile *currentfile;
+	char		lasterror[1024];
+#ifdef HAVE_LIBZ
+	z_streamp	zp;
+	void	   *zlibOut;
+#endif
+}	TarMethodData;
+static TarMethodData *tar_data = NULL;
+
+#define tar_clear_error() tar_data->lasterror[0] = '\0'
+#define tar_set_error(msg) strlcpy(tar_data->lasterror, msg, sizeof(tar_data->lasterror))
+
+static char *
+tar_getlasterror(void)
+{
+	/*
+	 * If a custom error is set, return that one. Otherwise, assume errno is
+	 * set and return that one.
+	 */
+	if (tar_data->lasterror[0])
+		return tar_data->lasterror;
+	return strerror(errno);
+}
+
+#ifdef HAVE_LIBZ
+static bool
+tar_write_compressed_data(void *buf, size_t count, bool flush)
+{
+	tar_data->zp->next_in = buf;
+	tar_data->zp->avail_in = count;
+
+	while (tar_data->zp->avail_in || flush)
+	{
+		int			r;
+
+		r = deflate(tar_data->zp, flush ? Z_FINISH : Z_NO_FLUSH);
+		if (r == Z_STREAM_ERROR)
+		{
+			tar_set_error("deflate failed");
+			return false;
+		}
+
+		if (tar_data->zp->avail_out < ZLIB_OUT_SIZE)
+		{
+			size_t		len = ZLIB_OUT_SIZE - tar_data->zp->avail_out;
+
+			if (write(tar_data->fd, tar_data->zlibOut, len) != len)
+				return false;
+
+			tar_data->zp->next_out = tar_data->zlibOut;
+			tar_data->zp->avail_out = ZLIB_OUT_SIZE;
+		}
+
+		if (r == Z_STREAM_END)
+			break;
+	}
+
+	if (flush)
+	{
+		/* Reset the stream for writing */
+		if (deflateReset(tar_data->zp) != Z_OK)
+		{
+			tar_set_error("deflateReset failed");
+			return false;
+		}
+	}
+
+	return true;
+}
+#endif
+
+static ssize_t
+tar_write(Walfile f, const void *buf, size_t count)
+{
+	ssize_t		r;
+
+	Assert(f != NULL);
+	tar_clear_error();
+
+	/* Tarfile will always be positioned at the end */
+	if (!tar_data->compression)
+	{
+		r = write(tar_data->fd, buf, count);
+		if (r > 0)
+			((TarMethodFile *) f)->currpos += r;
+		return r;
+	}
+#ifdef HAVE_LIBZ
+	else
+	{
+		if (!tar_write_compressed_data((void *) buf, count, false))
+			return -1;
+		((TarMethodFile *) f)->currpos += count;
+		return count;
+	}
+#endif
+}
+
+static bool
+tar_write_padding_data(TarMethodFile * f, size_t bytes)
+{
+	char	   *zerobuf = pg_malloc0(XLOG_BLCKSZ);
+	size_t		bytesleft = bytes;
+
+	while (bytesleft)
+	{
+		size_t		bytestowrite = bytesleft > XLOG_BLCKSZ ? XLOG_BLCKSZ : bytesleft;
+
+		size_t		r = tar_write(f, zerobuf, bytestowrite);
+
+		if (r < 0)
+			return false;
+		bytesleft -= r;
+	}
+	return true;
+}
+
+static Walfile
+tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
+{
+	int			save_errno;
+	static char tmppath[MAXPGPATH];
+
+	tar_clear_error();
+
+	if (tar_data->fd < 0)
+	{
+		/*
+		 * We open the tar file only when we first try to write to it.
+		 */
+		tar_data->fd = open(tar_data->tarfilename,
+						  O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
+		if (tar_data->fd < 0)
+			return NULL;
+
+#ifdef HAVE_LIBZ
+		if (tar_data->compression)
+		{
+			tar_data->zp = (z_streamp) pg_malloc(sizeof(z_stream));
+			tar_data->zp->zalloc = Z_NULL;
+			tar_data->zp->zfree = Z_NULL;
+			tar_data->zp->opaque = Z_NULL;
+			tar_data->zp->next_out = tar_data->zlibOut;
+			tar_data->zp->avail_out = ZLIB_OUT_SIZE;
+
+			/*
+			 * Initialize deflation library. Adding the magic value 16 to the
+			 * default 15 for the windowBits parameter makes the output be
+			 * gzip instead of zlib.
+			 */
+			if (deflateInit2(tar_data->zp, tar_data->compression, Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY) != Z_OK)
+			{
+				tar_set_error("deflateInit2 failed");
+				return NULL;
+			}
+		}
+#endif
+
+		/* There's no tar header itself, the file starts with regular files */
+	}
+
+	Assert(tar_data->currentfile == NULL);
+	if (tar_data->currentfile != NULL)
+	{
+		tar_set_error("implementation error: tar files can't have more than one open file\n");
+		return NULL;
+	}
+
+	tar_data->currentfile = pg_malloc0(sizeof(TarMethodFile));
+
+	snprintf(tmppath, sizeof(tmppath), "%s%s",
+			 pathname, temp_suffix ? temp_suffix : "");
+
+	/* Create a header with size set to 0 - we will fill out the size on close */
+	if (tarCreateHeader(tar_data->currentfile->header, tmppath, NULL, 0, S_IRUSR | S_IWUSR, 0, 0, time(NULL)) != TAR_OK)
+	{
+		pg_free(tar_data->currentfile);
+		tar_data->currentfile = NULL;
+		tar_set_error("could not create tar header");
+		return NULL;
+	}
+
+#ifdef HAVE_LIBZ
+	if (tar_data->compression)
+	{
+		/* Flush existing data */
+		if (!tar_write_compressed_data(NULL, 0, true))
+			return NULL;
+
+		/* Turn off compression for header */
+		if (deflateParams(tar_data->zp, 0, 0) != Z_OK)
+		{
+			tar_set_error("deflateParams failed");
+			return NULL;
+		}
+	}
+#endif
+
+	tar_data->currentfile->ofs_start = lseek(tar_data->fd, 0, SEEK_CUR);
+	if (tar_data->currentfile->ofs_start == -1)
+	{
+		save_errno = errno;
+		pg_free(tar_data->currentfile);
+		tar_data->currentfile = NULL;
+		errno = save_errno;
+		return NULL;
+	}
+	tar_data->currentfile->currpos = 0;
+
+	if (!tar_data->compression)
+	{
+		if (write(tar_data->fd, tar_data->currentfile->header, 512) != 512)
+		{
+			save_errno = errno;
+			pg_free(tar_data->currentfile);
+			tar_data->currentfile = NULL;
+			errno = save_errno;
+			return NULL;
+		}
+	}
+#ifdef HAVE_LIBZ
+	else
+	{
+		/* Write header through the zlib APIs but with no compression */
+		if (!tar_write_compressed_data(tar_data->currentfile->header, 512, true))
+			return NULL;
+
+		/* Re-enable compression for the rest of the file */
+		if (deflateParams(tar_data->zp, tar_data->compression, 0) != Z_OK)
+		{
+			tar_set_error("deflateParams failed");
+			return NULL;
+		}
+	}
+#endif
+
+	tar_data->currentfile->pathname = pg_strdup(pathname);
+
+	/*
+	 * Uncompressed files are padded on creation, but for compression we can't
+	 * do that
+	 */
+	if (pad_to_size)
+	{
+		tar_data->currentfile->pad_to_size = pad_to_size;
+		if (!tar_data->compression)
+		{
+			/* Uncompressed, so pad now */
+			tar_write_padding_data(tar_data->currentfile, pad_to_size);
+			/* Seek back to start */
+			if (lseek(tar_data->fd, tar_data->currentfile->ofs_start, SEEK_SET) != tar_data->currentfile->ofs_start)
+				return NULL;
+
+			tar_data->currentfile->currpos = 0;
+		}
+	}
+
+	return tar_data->currentfile;
+}
+
+static ssize_t
+tar_get_file_size(const char *pathname)
+{
+	tar_clear_error();
+
+	/* Currently not used, so not supported */
+	errno = ENOSYS;
+	return -1;
+}
+
+static off_t
+tar_get_current_pos(Walfile f)
+{
+	Assert(f != NULL);
+	tar_clear_error();
+
+	return ((TarMethodFile *) f)->currpos;
+}
+
+static int
+tar_fsync(Walfile f)
+{
+	Assert(f != NULL);
+	tar_clear_error();
+
+	/*
+	 * Always sync the whole tarfile, because that's all we can do. This makes
+	 * no sense on compressed files, so just ignore those.
+	 */
+	if (tar_data->compression)
+		return 0;
+
+	return fsync(tar_data->fd);
+}
+
+static int
+tar_close(Walfile f, WalCloseMethod method)
+{
+	ssize_t		filesize;
+	int			padding;
+	TarMethodFile *tf = (TarMethodFile *) f;
+
+	Assert(f != NULL);
+	tar_clear_error();
+
+	if (method == CLOSE_UNLINK)
+	{
+		if (tar_data->compression)
+		{
+			tar_set_error("unlink not supported with compression");
+			return -1;
+		}
+
+		/*
+		 * Unlink the file that we just wrote to the tar. We do this by
+		 * truncating it to the start of the header. This is safe as we only
+		 * allow writing of the very last file.
+		 */
+		if (ftruncate(tar_data->fd, tf->ofs_start) != 0)
+			return -1;
+
+		pg_free(tf->pathname);
+		pg_free(tf);
+		tar_data->currentfile = NULL;
+
+		return 0;
+	}
+
+	/*
+	 * Pad the file itself with zeroes if necessary. Note that this is
+	 * different from the tar format padding -- this is the padding we asked
+	 * for when the file was opened.
+	 */
+	if (tf->pad_to_size)
+	{
+		if (tar_data->compression)
+		{
+			/*
+			 * A compressed tarfile is padded on close since we cannot know
+			 * the size of the compressed output until the end.
+			 */
+			size_t		sizeleft = tf->pad_to_size - tf->currpos;
+
+			if (sizeleft)
+			{
+				if (!tar_write_padding_data(tf, sizeleft))
+					return -1;
+			}
+		}
+		else
+		{
+			/*
+			 * An uncompressed tarfile was padded on creation, so just adjust
+			 * the current position as if we seeked to the end.
+			 */
+			tf->currpos = tf->pad_to_size;
+		}
+	}
+
+	/*
+	 * Get the size of the file, and pad the current data up to the nearest
+	 * 512 byte boundary.
+	 */
+	filesize = tar_get_current_pos(f);
+	padding = ((filesize + 511) & ~511) - filesize;
+	if (padding)
+	{
+		char		zerobuf[512];
+
+		MemSet(zerobuf, 0, padding);
+		if (tar_write(f, zerobuf, padding) != padding)
+			return -1;
+	}
+
+
+#ifdef HAVE_LIBZ
+	if (tar_data->compression)
+	{
+		/* Flush the current buffer */
+		if (!tar_write_compressed_data(NULL, 0, true))
+		{
+			errno = EINVAL;
+			return -1;
+		}
+	}
+#endif
+
+	/*
+	 * Now go back and update the header with the correct filesize and
+	 * possibly also renaming the file. We overwrite the entire current header
+	 * when done, including the checksum.
+	 */
+	print_tar_number(&(tf->header[124]), 12, filesize);
+
+	if (method == CLOSE_NORMAL)
+
+		/*
+		 * We overwrite it with what it was before if we have no tempname,
+		 * since we're going to write the buffer anyway.
+		 */
+		strlcpy(&(tf->header[0]), tf->pathname, 100);
+
+	print_tar_number(&(tf->header[148]), 8, tarChecksum(((TarMethodFile *) f)->header));
+	if (lseek(tar_data->fd, tf->ofs_start, SEEK_SET) != ((TarMethodFile *) f)->ofs_start)
+		return -1;
+	if (!tar_data->compression)
+	{
+		if (write(tar_data->fd, tf->header, 512) != 512)
+			return -1;
+	}
+#ifdef HAVE_LIBZ
+	else
+	{
+		/* Turn off compression */
+		if (deflateParams(tar_data->zp, 0, 0) != Z_OK)
+		{
+			tar_set_error("deflateParams failed");
+			return -1;
+		}
+
+		/* Overwrite the header, assuming the size will be the same */
+		if (!tar_write_compressed_data(tar_data->currentfile->header, 512, true))
+			return -1;
+
+		/* Turn compression back on */
+		if (deflateParams(tar_data->zp, tar_data->compression, 0) != Z_OK)
+		{
+			tar_set_error("deflateParams failed");
+			return -1;
+		}
+	}
+#endif
+
+	/* Move file pointer back down to end, so we can write the next file */
+	if (lseek(tar_data->fd, 0, SEEK_END) < 0)
+		return -1;
+
+	/* Always fsync on close, so the padding gets fsynced */
+	tar_fsync(f);
+
+	/* Clean up and done */
+	pg_free(tf->pathname);
+	pg_free(tf);
+	tar_data->currentfile = NULL;
+
+	return 0;
+}
+
+static int
+tar_unlink(const char *pathname)
+{
+	tar_clear_error();
+	errno = ENOSYS;
+	return -1;
+}
+
+static bool
+tar_existsfile(const char *pathname)
+{
+	tar_clear_error();
+	/* We only deal with new tarfiles, so nothing externally created exists */
+	return false;
+}
+
+static bool
+tar_finish(void)
+{
+	char		zerobuf[1024];
+
+	tar_clear_error();
+
+	if (tar_data->currentfile)
+	{
+		if (tar_close(tar_data->currentfile, CLOSE_NORMAL) != 0)
+			return false;
+	}
+
+	/* A tarfile always ends with two empty  blocks */
+	MemSet(zerobuf, 0, sizeof(zerobuf));
+	if (!tar_data->compression)
+	{
+		if (write(tar_data->fd, zerobuf, sizeof(zerobuf)) != sizeof(zerobuf))
+			return false;
+	}
+#ifdef HAVE_LIBZ
+	else
+	{
+		if (!tar_write_compressed_data(zerobuf, sizeof(zerobuf), false))
+			return false;
+
+		/* Also flush all data to make sure the gzip stream is finished */
+		tar_data->zp->next_in = NULL;
+		tar_data->zp->avail_in = 0;
+		while (true)
+		{
+			int			r;
+
+			r = deflate(tar_data->zp, Z_FINISH);
+
+			if (r == Z_STREAM_ERROR)
+			{
+				tar_set_error("deflate failed");
+				return false;
+			}
+			if (tar_data->zp->avail_out < ZLIB_OUT_SIZE)
+			{
+				size_t		len = ZLIB_OUT_SIZE - tar_data->zp->avail_out;
+
+				if (write(tar_data->fd, tar_data->zlibOut, len) != len)
+					return false;
+			}
+			if (r == Z_STREAM_END)
+				break;
+		}
+
+		if (deflateEnd(tar_data->zp) != Z_OK)
+		{
+			tar_set_error("deflateEnd failed");
+			return false;
+		}
+	}
+#endif
+
+	/* sync the empty blocks as well, since they're after the last file */
+	fsync(tar_data->fd);
+
+	if (close(tar_data->fd) != 0)
+		return false;
+
+	tar_data->fd = -1;
+
+	return true;
+}
+
+WalWriteMethod *
+CreateWalTarMethod(const char *tarbase, int compression)
+{
+	WalWriteMethod *method;
+	const char *suffix = (compression > 0) ? ".tar.gz" : ".tar";
+
+	method = pg_malloc0(sizeof(WalWriteMethod));
+	method->open_for_write = tar_open_for_write;
+	method->write = tar_write;
+	method->get_current_pos = tar_get_current_pos;
+	method->get_file_size = tar_get_file_size;
+	method->close = tar_close;
+	method->fsync = tar_fsync;
+	method->unlink = tar_unlink;
+	method->existsfile = tar_existsfile;
+	method->finish = tar_finish;
+	method->getlasterror = tar_getlasterror;
+
+	tar_data = pg_malloc0(sizeof(TarMethodData));
+	tar_data->tarfilename = pg_malloc0(strlen(tarbase) + strlen(suffix) + 1);
+	sprintf(tar_data->tarfilename, "%s%s", tarbase, suffix);
+	tar_data->fd = -1;
+	tar_data->compression = compression;
+	if (compression)
+		tar_data->zlibOut = (char *) pg_malloc(ZLIB_OUT_SIZE + 1);
+
+	return method;
+}
diff --git a/src/bin/pg_basebackup/walmethods.h b/src/bin/pg_basebackup/walmethods.h
new file mode 100644
index 0000000..9922cfd
--- /dev/null
+++ b/src/bin/pg_basebackup/walmethods.h
@@ -0,0 +1,46 @@
+/*-------------------------------------------------------------------------
+ *
+ * walmethods.h
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		  src/bin/pg_basebackup/walmethods.h
+ *-------------------------------------------------------------------------
+ */
+
+
+typedef void *Walfile;
+
+typedef enum
+{
+	CLOSE_NORMAL,
+	CLOSE_UNLINK,
+	CLOSE_NO_RENAME,
+}	WalCloseMethod;
+
+typedef struct WalWriteMethod WalWriteMethod;
+struct WalWriteMethod
+{
+	Walfile(*open_for_write) (const char *pathname, const char *temp_suffix, size_t pad_to_size);
+	int			(*close) (Walfile f, WalCloseMethod method);
+	int			(*unlink) (const char *pathname);
+	bool		(*existsfile) (const char *pathname);
+	ssize_t		(*get_file_size) (const char *pathname);
+
+	ssize_t		(*write) (Walfile f, const void *buf, size_t count);
+	off_t		(*get_current_pos) (Walfile f);
+	int			(*fsync) (Walfile f);
+	bool		(*finish) (void);
+	char	   *(*getlasterror) (void);
+};
+
+/*
+ * Available WAL methods:
+ *	- WalDirectoryMethod - write WAL to regular files in a standard pg_xlog
+ *	- TarDirectoryMethod - write WAL to a tarfile corresponding to pg_xlog
+ *						   (only implements the methods required for pg_basebackup,
+ *						   not all those required for pg_receivexlog)
+ */
+WalWriteMethod *CreateWalDirectoryMethod(const char *basedir);
+WalWriteMethod *CreateWalTarMethod(const char *tarbase, int compression);
diff --git a/src/include/pgtar.h b/src/include/pgtar.h
index 45ca400..1d179f0 100644
--- a/src/include/pgtar.h
+++ b/src/include/pgtar.h
@@ -22,4 +22,5 @@ enum tarError
 extern enum tarError tarCreateHeader(char *h, const char *filename, const char *linktarget,
 			  pgoff_t size, mode_t mode, uid_t uid, gid_t gid, time_t mtime);
 extern uint64 read_tar_number(const char *s, int len);
+extern void print_tar_number(char *s, int len, uint64 val);
 extern int	tarChecksum(char *header);
diff --git a/src/port/tar.c b/src/port/tar.c
index 52a2113..f1da959 100644
--- a/src/port/tar.c
+++ b/src/port/tar.c
@@ -16,7 +16,7 @@
  * support only non-negative numbers, so we don't worry about the GNU rules
  * for handling negative numbers.)
  */
-static void
+void
 print_tar_number(char *s, int len, uint64 val)
 {
 	if (val < (((uint64) 1) << ((len - 1) * 3)))
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to