23.04.2019 14:08, Anastasia Lubennikova wrote:
I'm volunteering to write a draft patch or, more likely, set of patches, which
will allow us to discuss the subject in more detail.
And to do that I wish we agree on the API and data format (at least broadly). Looking forward to hearing your thoughts.

Though the previous discussion stalled,
I still hope that we could agree on basic points such as a map file format and protocol extension,
which is necessary to start implementing the feature.

--------- Proof Of Concept patch ---------

In attachments, you can find a prototype of incremental pg_basebackup, which consists of 2 features:

1) To perform incremental backup one should call pg_basebackup with a new argument:

pg_basebackup -D 'basedir' --prev-backup-start-lsn 'lsn'

where lsn is a start_lsn of parent backup (can be found in "backup_label" file)

It calls BASE_BACKUP replication command with a new argument PREV_BACKUP_START_LSN 'lsn'.

For datafiles, only pages with LSN > prev_backup_start_lsn will be included in the backup. They are saved into 'filename.partial' file, 'filename.blockmap' file contains an array of BlockNumbers. For example, if we backuped blocks 1,3,5, filename.partial will contain 3 blocks, and 'filename.blockmap' will contain array {1,3,5}.

Non-datafiles use the same format as before.

2) To merge incremental backup into a full backup call

pg_basebackup -D 'basedir' --incremental-pgdata 'incremental_basedir' --merge-backups

It will move all files from 'incremental_basedir' to 'basedir' handling '.partial' files correctly.


--------- Questions to discuss ---------

Please note that it is just a proof-of-concept patch and it can be optimized in many ways.
Let's concentrate on issues that affect the protocol or data format.

1) Whether we collect block maps using simple "read everything page by page" approach or WAL scanning or any other page tracking algorithm, we must choose a map format.
I implemented the simplest one, while there are more ideas:

- We can have a map not per file, but per relation or maybe per tablespace,
which will make implementation more complex, but probably more optimal.
The only problem I see with existing implementation is that even if only a few blocks changed,
we still must pad it to 512 bytes per tar format requirements.

- We can save LSNs into the block map.

typedef struct BlockMapItem {
    BlockNumber blkno;
    XLogRecPtr lsn;
} BlockMapItem;

In my implementation, invalid prev_backup_start_lsn means fallback to regular basebackup without any block maps. Alternatively, we can define another meaning of this value and send a block map for all files.
Backup utilities can use these maps to speed up backup merge or restore.

2) We can implement BASE_BACKUP SEND_FILELIST replication command,
which will return a list of filenames with file sizes and block maps if lsn was provided.

To avoid changing format, we can simply send tar headers for each file:
- tarHeader("filename.blockmap") followed by blockmap for relation files if prev_backup_start_lsn is provided; - tarHeader("filename") without actual file content for non relation files or for all files in "FULL" backup

The caller can parse messages and use them for any purpose, for example, to perform a parallel backup.

Thoughts?

--
Anastasia Lubennikova
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 13e0d23..e757bba 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -10459,7 +10459,7 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
 			ti->oid = pstrdup(de->d_name);
 			ti->path = pstrdup(buflinkpath.data);
 			ti->rpath = relpath ? pstrdup(relpath) : NULL;
-			ti->size = infotbssize ? sendTablespace(fullpath, true) : -1;
+			ti->size = infotbssize ? sendTablespace(fullpath, true, InvalidXLogRecPtr) : -1;
 
 			if (tablespaces)
 				*tablespaces = lappend(*tablespaces, ti);
diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c
index c2978a9..3560da1 100644
--- a/src/backend/replication/basebackup.c
+++ b/src/backend/replication/basebackup.c
@@ -41,6 +41,7 @@
 #include "utils/ps_status.h"
 #include "utils/relcache.h"
 #include "utils/timestamp.h"
+#include "utils/pg_lsn.h"
 
 
 typedef struct
@@ -52,13 +53,22 @@ typedef struct
 	bool		includewal;
 	uint32		maxrate;
 	bool		sendtblspcmapfile;
+	XLogRecPtr	prev_backup_start_lsn;
 } basebackup_options;
 
 
 static int64 sendDir(const char *path, int basepathlen, bool sizeonly,
-					 List *tablespaces, bool sendtblspclinks);
+					 List *tablespaces, bool sendtblspclinks, XLogRecPtr prev_backup_start_lsn);
 static bool sendFile(const char *readfilename, const char *tarfilename,
-					 struct stat *statbuf, bool missing_ok, Oid dboid);
+					 struct stat *statbuf, bool missing_ok, Oid dboid,
+					 XLogRecPtr prev_backup_start_lsn);
+static bool sendFileMap(const char *readfilename, const char *tarfilename,
+		struct stat *statbuf, bool missing_ok, Oid dboid,
+		XLogRecPtr prev_backup_start_lsn, int *expected_write_size);
+static bool sendFilePartial(const char *readfilename, const char *tarfilename,
+		struct stat *statbuf, bool missing_ok, Oid dboid,
+		XLogRecPtr prev_backup_start_lsn, int expected_write_size);
+
 static void sendFileWithContent(const char *filename, const char *content);
 static int64 _tarWriteHeader(const char *filename, const char *linktarget,
 							 struct stat *statbuf, bool sizeonly);
@@ -275,7 +285,8 @@ perform_base_backup(basebackup_options *opt)
 
 		/* Add a node for the base directory at the end */
 		ti = palloc0(sizeof(tablespaceinfo));
-		ti->size = opt->progress ? sendDir(".", 1, true, tablespaces, true) : -1;
+		ti->size = opt->progress ? sendDir(".", 1, true, tablespaces,
+										   true, opt->prev_backup_start_lsn) : -1;
 		tablespaces = lappend(tablespaces, ti);
 
 		/* Send tablespace header */
@@ -331,10 +342,10 @@ perform_base_backup(basebackup_options *opt)
 				if (tblspc_map_file && opt->sendtblspcmapfile)
 				{
 					sendFileWithContent(TABLESPACE_MAP, tblspc_map_file->data);
-					sendDir(".", 1, false, tablespaces, false);
+					sendDir(".", 1, false, tablespaces, false, opt->prev_backup_start_lsn);
 				}
 				else
-					sendDir(".", 1, false, tablespaces, true);
+					sendDir(".", 1, false, tablespaces, true, opt->prev_backup_start_lsn);
 
 				/* ... and pg_control after everything else. */
 				if (lstat(XLOG_CONTROL_FILE, &statbuf) != 0)
@@ -342,10 +353,10 @@ perform_base_backup(basebackup_options *opt)
 							(errcode_for_file_access(),
 							 errmsg("could not stat file \"%s\": %m",
 									XLOG_CONTROL_FILE)));
-				sendFile(XLOG_CONTROL_FILE, XLOG_CONTROL_FILE, &statbuf, false, InvalidOid);
+				sendFile(XLOG_CONTROL_FILE, XLOG_CONTROL_FILE, &statbuf, false, InvalidOid, InvalidXLogRecPtr);
 			}
 			else
-				sendTablespace(ti->path, false);
+				sendTablespace(ti->path, false, opt->prev_backup_start_lsn);
 
 			/*
 			 * If we're including WAL, and this is the main data directory we
@@ -592,7 +603,7 @@ perform_base_backup(basebackup_options *opt)
 						(errcode_for_file_access(),
 						 errmsg("could not stat file \"%s\": %m", pathbuf)));
 
-			sendFile(pathbuf, pathbuf, &statbuf, false, InvalidOid);
+			sendFile(pathbuf, pathbuf, &statbuf, false, InvalidOid, InvalidXLogRecPtr);
 
 			/* unconditionally mark file as archived */
 			StatusFilePath(pathbuf, fname, ".done");
@@ -650,6 +661,7 @@ parse_basebackup_options(List *options, basebackup_options *opt)
 	bool		o_maxrate = false;
 	bool		o_tablespace_map = false;
 	bool		o_noverify_checksums = false;
+	bool		o_prev_backup_start_lsn = false;
 
 	MemSet(opt, 0, sizeof(*opt));
 	foreach(lopt, options)
@@ -738,6 +750,25 @@ parse_basebackup_options(List *options, basebackup_options *opt)
 			noverify_checksums = true;
 			o_noverify_checksums = true;
 		}
+		else if (strcmp(defel->defname, "prev_backup_start_lsn") == 0)
+		{
+			char *prev_backup_start_lsn_str;
+			XLogRecPtr prev_backup_start_lsn;
+			bool		have_error = false;
+
+			if (o_prev_backup_start_lsn)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("duplicate option \"%s\"", defel->defname)));
+
+			prev_backup_start_lsn_str = strVal(defel->arg);
+			elog(WARNING, "prev_backup_start_lsn_str: %s", prev_backup_start_lsn_str);
+			prev_backup_start_lsn = pg_lsn_in_internal(prev_backup_start_lsn_str, &have_error);
+			//TODO handle parsing error
+
+			opt->prev_backup_start_lsn = (XLogRecPtr) prev_backup_start_lsn;
+			o_prev_backup_start_lsn = true;
+		}
 		else
 			elog(ERROR, "option \"%s\" not recognized",
 				 defel->defname);
@@ -966,7 +997,9 @@ sendFileWithContent(const char *filename, const char *content)
  * Only used to send auxiliary tablespaces, not PGDATA.
  */
 int64
-sendTablespace(char *path, bool sizeonly)
+sendTablespace(char* path, bool sizeonly,
+					   XLogRecPtr prev_backup_start_lsn)
+
 {
 	int64		size;
 	char		pathbuf[MAXPGPATH];
@@ -999,7 +1032,9 @@ sendTablespace(char *path, bool sizeonly)
 						   sizeonly);
 
 	/* Send all the files in the tablespace version directory */
-	size += sendDir(pathbuf, strlen(path), sizeonly, NIL, true);
+
+	size += sendDir(pathbuf, strlen(path), sizeonly, NIL, true,
+							prev_backup_start_lsn);
 
 	return size;
 }
@@ -1018,7 +1053,7 @@ sendTablespace(char *path, bool sizeonly)
  */
 static int64
 sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
-		bool sendtblspclinks)
+		bool sendtblspclinks, XLogRecPtr prev_backup_start_lsn)
 {
 	DIR		   *dir;
 	struct dirent *de;
@@ -1294,7 +1329,8 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
 				skip_this_dir = true;
 
 			if (!skip_this_dir)
-				size += sendDir(pathbuf, basepathlen, sizeonly, tablespaces, sendtblspclinks);
+				size += sendDir(pathbuf, basepathlen, sizeonly,
+										tablespaces, sendtblspclinks, prev_backup_start_lsn);
 		}
 		else if (S_ISREG(statbuf.st_mode))
 		{
@@ -1302,7 +1338,8 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
 
 			if (!sizeonly)
 				sent = sendFile(pathbuf, pathbuf + basepathlen + 1, &statbuf,
-								true, isDbDir ? pg_atoi(lastDir + 1, sizeof(Oid), 0) : InvalidOid);
+								true, isDbDir ? pg_atoi(lastDir + 1, sizeof(Oid), 0) : InvalidOid,
+										prev_backup_start_lsn);
 
 			if (sent || sizeonly)
 			{
@@ -1363,10 +1400,14 @@ is_checksummed_file(const char *fullpath, const char *filename)
  *
  * Returns true if the file was successfully sent, false if 'missing_ok',
  * and the file did not exist.
+ *
+ * If prev_backup_start_lsn is not InvalidXLogRecPtr, send .partial file,
+ * containing blocks for incremental backup and .blockmap file.
  */
+
 static bool
 sendFile(const char *readfilename, const char *tarfilename, struct stat *statbuf,
-		 bool missing_ok, Oid dboid)
+		 bool missing_ok, Oid dboid, XLogRecPtr prev_backup_start_lsn)
 {
 	FILE	   *fp;
 	BlockNumber blkno = 0;
@@ -1383,6 +1424,21 @@ sendFile(const char *readfilename, const char *tarfilename, struct stat *statbuf
 	int			segmentno = 0;
 	char	   *segmentpath;
 	bool		verify_checksum = false;
+	bool		file_has_map = false;
+	int expected_write_size = 0;
+
+	/* Send map, if requesred. */
+	if (prev_backup_start_lsn)
+		file_has_map = sendFileMap(readfilename, tarfilename, statbuf,
+					missing_ok, dboid, prev_backup_start_lsn, &expected_write_size);
+
+	/*
+	 * If possible, send incremental version of file
+	 * all non-relation files will be send in code below.
+	 */
+	if (file_has_map)
+		return sendFilePartial(readfilename, tarfilename, statbuf,
+					missing_ok, dboid, prev_backup_start_lsn, expected_write_size);
 
 	fp = AllocateFile(readfilename, "rb");
 	if (fp == NULL)
@@ -1447,6 +1503,8 @@ sendFile(const char *readfilename, const char *tarfilename, struct stat *statbuf
 			verify_checksum = false;
 		}
 
+		/* iterate over pages to get info we need.
+		 * ither it is checksum verification or collecting a map  */
 		if (verify_checksum)
 		{
 			for (i = 0; i < cnt / BLCKSZ; i++)
@@ -1468,15 +1526,15 @@ sendFile(const char *readfilename, const char *tarfilename, struct stat *statbuf
 					if (phdr->pd_checksum != checksum)
 					{
 						/*
-						 * Retry the block on the first failure.  It's
-						 * possible that we read the first 4K page of the
-						 * block just before postgres updated the entire block
-						 * so it ends up looking torn to us.  We only need to
-						 * retry once because the LSN should be updated to
-						 * something we can ignore on the next pass.  If the
-						 * error happens again then it is a true validation
-						 * failure.
-						 */
+						* Retry the block on the first failure.  It's
+						* possible that we read the first 4K page of the
+						* block just before postgres updated the entire block
+						* so it ends up looking torn to us.  We only need to
+						* retry once because the LSN should be updated to
+						* something we can ignore on the next pass.  If the
+						* error happens again then it is a true validation
+						* failure.
+						*/
 						if (block_retry == false)
 						{
 							/* Reread the failed block */
@@ -1484,7 +1542,7 @@ sendFile(const char *readfilename, const char *tarfilename, struct stat *statbuf
 							{
 								ereport(ERROR,
 										(errcode_for_file_access(),
-										 errmsg("could not fseek in file \"%s\": %m",
+										errmsg("could not fseek in file \"%s\": %m",
 												readfilename)));
 							}
 
@@ -1492,7 +1550,7 @@ sendFile(const char *readfilename, const char *tarfilename, struct stat *statbuf
 							{
 								ereport(ERROR,
 										(errcode_for_file_access(),
-										 errmsg("could not reread block %d of file \"%s\": %m",
+										errmsg("could not reread block %d of file \"%s\": %m",
 												blkno, readfilename)));
 							}
 
@@ -1500,7 +1558,7 @@ sendFile(const char *readfilename, const char *tarfilename, struct stat *statbuf
 							{
 								ereport(ERROR,
 										(errcode_for_file_access(),
-										 errmsg("could not fseek in file \"%s\": %m",
+										errmsg("could not fseek in file \"%s\": %m",
 												readfilename)));
 							}
 
@@ -1593,6 +1651,232 @@ sendFile(const char *readfilename, const char *tarfilename, struct stat *statbuf
 }
 
 
+static bool
+sendFileMap(const char *readfilename, const char *tarfilename, struct stat *statbuf,
+		 bool missing_ok, Oid dboid, XLogRecPtr prev_backup_start_lsn,
+		 int *expected_write_size)
+{
+	FILE	   *fp;
+	BlockNumber blkno = 0;
+	char		buf[TAR_SEND_SIZE];
+	off_t		cnt;
+	int			i;
+	pgoff_t		len = 0;
+	char	   *page;
+	size_t		pad;
+	char		*tarfilename_blockmap = NULL;
+	BlockNumber *pagemap = NULL;
+	char	   *filename;
+	int			statbuf_size = statbuf->st_size;
+	int 		pagemap_real_size;
+	int 		n_blocks_to_send = 0;
+
+	Assert(prev_backup_start_lsn != InvalidXLogRecPtr);
+
+	tarfilename_blockmap = psprintf("%s.blockmap", tarfilename);
+
+	/*
+	 * Get the filename (excluding path).  As last_dir_separator()
+	 * includes the last directory separator, we chop that off by
+	 * incrementing the pointer.
+
+	 */
+	filename = last_dir_separator(readfilename) + 1;
+
+	/*
+	 * Handle all non relation files here.
+	 * Do nothing.
+	 */
+	if (!is_checksummed_file(readfilename, filename) ||
+		!S_ISREG(statbuf->st_mode) ||
+		(filename[0] == 't' && isdigit(filename[1])) || // exclude all temp files
+		!isdigit(filename[0]) || // relfiles always start with number
+		strstr(filename, "_")) // exclude all fork files
+	{
+		elog(INFO, "sendFileMap %s, no datafile", filename);
+		return false;
+	}
+	elog(INFO, "sendFileMap %s, datafile", filename);
+
+	fp = AllocateFile(readfilename, "rb");
+	if (fp == NULL)
+	{
+		if (errno == ENOENT && missing_ok)
+			return false;
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not open file \"%s\": %m", readfilename)));
+	}
+
+	/* allocate pagemap of the size enough to write all file blocks */
+	pagemap = palloc0((statbuf->st_size / BLCKSZ)*sizeof(BlockNumber));
+
+	while ((cnt = fread(buf, 1, Min(sizeof(buf), statbuf->st_size - len), fp)) > 0)
+	{
+		/* iterate over pages to collect a map */
+		for (i = 0; i < cnt / BLCKSZ; i++)
+		{
+			page = buf + BLCKSZ * i;
+			/* add block to map */
+			if (!PageIsNew(page) && PageGetLSN(page) > prev_backup_start_lsn)
+			{
+				pagemap[n_blocks_to_send] = blkno;
+				elog(INFO, "expected_write_size %d add to map blkno %d pagemap[n_blocks_to_send] %d of file %s page lsn %X/%X prev_backup_start_lsn %X/%X",
+						*expected_write_size, blkno, pagemap[n_blocks_to_send], readfilename, (uint32) (PageGetLSN(page) >> 32), (uint32) PageGetLSN(page),
+						(uint32) (prev_backup_start_lsn >> 32), (uint32) prev_backup_start_lsn);
+				*expected_write_size += BLCKSZ;
+				n_blocks_to_send++;
+			}
+			blkno++;
+		}
+
+		len += cnt;
+
+		if (len >= statbuf->st_size)
+		{
+			/*
+			 * Reached end of file. The file could be longer, if it was
+			 * extended while we were sending it, but for a base backup we can
+			 * ignore such extended data. It will be restored from WAL.
+			 */
+			break;
+		}
+	}
+
+	pagemap_real_size = n_blocks_to_send*sizeof(BlockNumber);
+
+	statbuf->st_size = pagemap_real_size;
+	_tarWriteHeader(tarfilename_blockmap, NULL, statbuf, false);
+
+	if (pagemap_real_size)
+	{
+		pq_putmessage('d', (char *) pagemap, pagemap_real_size);
+
+		/*
+		* Pad to 512 byte boundary, per tar format requirements. (This small
+		* piece of data is probably not worth throttling.)
+		*/
+		pad = ((pagemap_real_size + 511) & ~511) - pagemap_real_size;
+		if (pad > 0)
+		{
+			MemSet(buf, 0, pad);
+			pq_putmessage('d', buf, pad);
+		}
+	}
+
+	statbuf->st_size = statbuf_size;
+	FreeFile(fp);
+
+	pfree(pagemap);
+	return true;
+}
+
+static bool
+sendFilePartial(const char *readfilename, const char *tarfilename, struct stat *statbuf,
+		 bool missing_ok, Oid dboid, XLogRecPtr prev_backup_start_lsn,
+		 int expected_write_size)
+{
+	FILE	   *fp;
+	BlockNumber blkno = 0;
+	char		buf[TAR_SEND_SIZE];
+	char		sendbuf[TAR_SEND_SIZE];
+	int			n_blocks_to_send = 0;
+	off_t		cnt;
+	int			i;
+	pgoff_t		len = 0;
+	char	   *page;
+	char		*tarfilename_partial = NULL;
+	int			pad;
+	int			statbuf_size;
+	int			write_len = 0;
+
+	Assert(prev_backup_start_lsn != InvalidXLogRecPtr);
+
+	tarfilename_partial = psprintf("%s.partial", tarfilename);
+
+	statbuf_size = statbuf->st_size;
+	statbuf->st_size = expected_write_size;
+	_tarWriteHeader(tarfilename_partial, NULL, statbuf, false);
+	statbuf->st_size = statbuf_size;
+
+	fp = AllocateFile(readfilename, "rb");
+	if (fp == NULL)
+	{
+		if (errno == ENOENT && missing_ok)
+			return false;
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not open file \"%s\": %m", readfilename)));
+	}
+
+	while ((cnt = fread(buf, 1, Min(sizeof(buf), statbuf->st_size - len), fp)) > 0)
+	{
+		/* iterate over pages to collect a map */
+		for (i = 0; i < cnt / BLCKSZ; i++)
+		{
+			page = buf + BLCKSZ * i;
+
+			if (!PageIsNew(page) && PageGetLSN(page) > prev_backup_start_lsn)
+			{
+				elog(INFO, "add to sendbuf blkno %d, n_blocks_to_send %d of file %s page lsn %X/%X prev_backup_start_lsn %X/%X",
+						blkno, n_blocks_to_send, readfilename, (uint32) (PageGetLSN(page) >> 32), (uint32) PageGetLSN(page),
+						(uint32) (prev_backup_start_lsn >> 32), (uint32) prev_backup_start_lsn);
+				memcpy(sendbuf + BLCKSZ * n_blocks_to_send, page, BLCKSZ);
+				n_blocks_to_send++;
+			}
+			blkno++;
+		}
+
+		{
+			elog(INFO, "send n_blocks_to_send %d of file %s",
+							n_blocks_to_send, readfilename);
+			/* Send the chunk as a CopyData message */
+			write_len += n_blocks_to_send*BLCKSZ;
+			if (pq_putmessage('d', sendbuf, n_blocks_to_send*BLCKSZ))
+				ereport(ERROR,
+						(errmsg("base backup could not send data, aborting backup")));
+			n_blocks_to_send = 0;
+		}
+
+		len += cnt;
+
+		if (len >= statbuf->st_size)
+		{
+			/*
+			 * Reached end of file. The file could be longer, if it was
+			 * extended while we were sending it, but for a base backup we can
+			 * ignore such extended data. It will be restored from WAL.
+			 */
+			break;
+		}
+	}
+
+	if (write_len < expected_write_size)
+	{
+		MemSet(buf, 0, sizeof(buf));
+		while (write_len < expected_write_size)
+		{
+			cnt = Min(sizeof(buf), expected_write_size - write_len);
+			pq_putmessage('d', buf, cnt);
+			write_len += cnt;
+			throttle(cnt);
+		}
+	}
+
+	/* Pad to 512 byte boundary, per tar format requirements */
+	pad = ((write_len + 511) & ~511) - write_len;
+	if (pad > 0)
+	{
+		char		buf[512];
+
+		MemSet(buf, 0, pad);
+		pq_putmessage('d', buf, pad);
+	}
+
+	FreeFile(fp);
+	return true;
+}
+
 static int64
 _tarWriteHeader(const char *filename, const char *linktarget,
 				struct stat *statbuf, bool sizeonly)
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index c4e11cc..cb883a8 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -87,6 +87,7 @@ static SQLCmd *make_sqlcmd(void);
 %token K_EXPORT_SNAPSHOT
 %token K_NOEXPORT_SNAPSHOT
 %token K_USE_SNAPSHOT
+%token K_PREV_BACKUP_START_LSN
 
 %type <node>	command
 %type <node>	base_backup start_replication start_logical_replication
@@ -103,6 +104,7 @@ static SQLCmd *make_sqlcmd(void);
 %type <list>	create_slot_opt_list
 %type <defelt>	create_slot_opt
 
+
 %%
 
 firstcmd: command opt_semicolon
@@ -155,7 +157,7 @@ var_name:	IDENT	{ $$ = $1; }
 
 /*
  * BASE_BACKUP [LABEL '<label>'] [PROGRESS] [FAST] [WAL] [NOWAIT]
- * [MAX_RATE %d] [TABLESPACE_MAP] [NOVERIFY_CHECKSUMS]
+ * [MAX_RATE %d] [TABLESPACE_MAP] [NOVERIFY_CHECKSUMS] [K_PREV_BACKUP_START_LSN 'start_lsn']
  */
 base_backup:
 			K_BASE_BACKUP base_backup_opt_list
@@ -213,6 +215,11 @@ base_backup_opt:
 				{
 				  $$ = makeDefElem("noverify_checksums",
 								   (Node *)makeInteger(true), -1);
+			}
+			| K_PREV_BACKUP_START_LSN SCONST
+				{
+				  $$ = makeDefElem("prev_backup_start_lsn",
+								   (Node *)makeString($2), -1);
 				}
 			;
 
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
index 380faeb..042e148 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -107,6 +107,7 @@ EXPORT_SNAPSHOT		{ return K_EXPORT_SNAPSHOT; }
 NOEXPORT_SNAPSHOT	{ return K_NOEXPORT_SNAPSHOT; }
 USE_SNAPSHOT		{ return K_USE_SNAPSHOT; }
 WAIT				{ return K_WAIT; }
+PREV_BACKUP_START_LSN	{ return K_PREV_BACKUP_START_LSN; }
 
 ","				{ return ','; }
 ";"				{ return ';'; }
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index 15f43f9..bd2930e 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -106,6 +106,12 @@ static bool create_slot = false;
 static bool no_slot = false;
 static bool verify_checksums = true;
 
+
+static char* prev_backup_start_lsn = NULL;
+static char* prev_backup_start_lsn_str = NULL;
+static char* incremental_basedir = NULL;
+static bool merge_backups = false;
+
 static bool success = false;
 static bool made_new_pgdata = false;
 static bool found_existing_pgdata = false;
@@ -150,6 +156,7 @@ static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum);
 static void GenerateRecoveryConf(PGconn *conn);
 static void WriteRecoveryConf(void);
 static void BaseBackup(void);
+static void MergeBackups(void);
 
 static bool reached_end_position(XLogRecPtr segendpos, uint32 timeline,
 								 bool segment_finished);
@@ -1473,6 +1480,9 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
 			 */
 			snprintf(filename, sizeof(filename), "%s/%s", current_path,
 					 copybuf);
+
+			pg_log_info("filename %s current_len_left %ld", filename, current_len_left);
+
 			if (filename[strlen(filename) - 1] == '/')
 			{
 				/*
@@ -1863,8 +1873,12 @@ BaseBackup(void)
 			fprintf(stderr, "\n");
 	}
 
+	if (prev_backup_start_lsn)
+		prev_backup_start_lsn_str = psprintf("PREV_BACKUP_START_LSN \'%s\'",
+											 prev_backup_start_lsn);
+
 	basebkp =
-		psprintf("BASE_BACKUP LABEL '%s' %s %s %s %s %s %s %s",
+		psprintf("BASE_BACKUP LABEL '%s' %s %s %s %s %s %s %s %s",
 				 escaped_label,
 				 showprogress ? "PROGRESS" : "",
 				 includewal == FETCH_WAL ? "WAL" : "",
@@ -1872,7 +1886,9 @@ BaseBackup(void)
 				 includewal == NO_WAL ? "" : "NOWAIT",
 				 maxrate_clause ? maxrate_clause : "",
 				 format == 't' ? "TABLESPACE_MAP" : "",
-				 verify_checksums ? "" : "NOVERIFY_CHECKSUMS");
+				 verify_checksums ? "" : "NOVERIFY_CHECKSUMS",
+				 prev_backup_start_lsn_str?prev_backup_start_lsn_str:""
+				);
 
 	if (PQsendQuery(conn, basebkp) == 0)
 	{
@@ -2158,6 +2174,88 @@ BaseBackup(void)
 		pg_log_info("base backup completed");
 }
 
+static void
+walkdir(const char *path, const char *basepath)
+{
+	DIR		   *dir;
+	struct dirent *de;
+
+	dir = opendir(path);
+	if (dir == NULL)
+	{
+		pg_log_error("could not open directory \"%s\": %m", path);
+		return;
+	}
+
+	while (errno = 0, (de = readdir(dir)) != NULL)
+	{
+		char		subpath[MAXPGPATH * 2];
+		char		basesubpath[MAXPGPATH * 2];
+		struct stat fst;
+		int			sret;
+
+		if (strcmp(de->d_name, ".") == 0 ||
+			strcmp(de->d_name, "..") == 0)
+			continue;
+
+		snprintf(subpath, sizeof(subpath), "%s/%s", path, de->d_name);
+		snprintf(basesubpath, sizeof(subpath), "%s/%s", basepath, de->d_name);
+
+		/* Don't process symlinks */
+		sret = lstat(subpath, &fst);
+
+		if (sret < 0)
+		{
+			pg_log_error("could not stat file \"%s\": %m", subpath);
+			continue;
+		}
+
+		if (S_ISREG(fst.st_mode))
+		{
+			char	basicfilename[MAXPGPATH * 2];
+			char	topath[MAXPGPATH * 2];
+			char *partial_suffix = NULL;
+			if ((partial_suffix = strstr(de->d_name, ".partial")) != NULL) //handle incremental files
+			{
+				char	mappath[MAXPGPATH * 2];
+
+				strncpy(&basicfilename, de->d_name, partial_suffix - de->d_name);
+
+				snprintf(mappath, sizeof(mappath), "%s.blockmap", basicfilename);
+				partial_suffix = strstr(basesubpath, ".partial");
+				pg_log_info("incremental basic %s, map %s, partial %s",
+							basicfilename, mappath, de->d_name);
+
+				strncpy(&topath, basesubpath, partial_suffix - basesubpath);
+				pg_log_info("incremental move from %s to %s", subpath, topath);
+			}
+			else if (!strstr(de->d_name, ".blockmap")) //skip .blockmap files
+			{
+				pg_log_info("non-incremental move from %s to %s", subpath, basesubpath);
+			}
+		}
+		else if (S_ISDIR(fst.st_mode))
+			walkdir(subpath, basesubpath);
+	}
+
+	if (errno)
+		pg_log_error("could not read directory \"%s\": %m", path);
+
+	(void) closedir(dir);
+}
+
+static void
+MergeBackups(void)
+{
+	/*
+	 * walk all files in incremental_basedir
+	 * For files that doesn't have ".blockmap",
+	 * just replace file in a basedir with a new one.
+	 * For files that have ".blockmap"
+	 * read incremental file block by block and update file in basedir
+	 */
+	walkdir(incremental_basedir, basedir);
+}
 
 int
 main(int argc, char **argv)
@@ -2191,6 +2289,9 @@ main(int argc, char **argv)
 		{"waldir", required_argument, NULL, 1},
 		{"no-slot", no_argument, NULL, 2},
 		{"no-verify-checksums", no_argument, NULL, 3},
+		{"prev-backup-start-lsn", required_argument, NULL, 5},
+		{"incremental-pgdata", required_argument, NULL, 6},
+		{"merge-backups", no_argument, NULL, 7},
 		{NULL, 0, NULL, 0}
 	};
 	int			c;
@@ -2359,6 +2460,15 @@ main(int argc, char **argv)
 			case 3:
 				verify_checksums = false;
 				break;
+			case 5:
+				prev_backup_start_lsn = pg_strdup(optarg);
+				break;
+			case 6:
+				incremental_basedir = pg_strdup(optarg);
+				break;
+			case 7:
+				merge_backups = true;
+				break;
 			default:
 
 				/*
@@ -2393,6 +2503,18 @@ main(int argc, char **argv)
 		exit(1);
 	}
 
+	if (merge_backups && (incremental_basedir == NULL))
+	{
+		pg_log_error("no target incremental directory specified");
+		exit(1);
+	}
+
+	if (merge_backups && incremental_basedir)
+	{
+		MergeBackups();
+		return 0;
+	}
+
 	/*
 	 * Mutually exclusive arguments
 	 */
diff --git a/src/include/replication/basebackup.h b/src/include/replication/basebackup.h
index 503a5b9..974c126 100644
--- a/src/include/replication/basebackup.h
+++ b/src/include/replication/basebackup.h
@@ -31,6 +31,5 @@ typedef struct
 
 extern void SendBaseBackup(BaseBackupCmd *cmd);
 
-extern int64 sendTablespace(char *path, bool sizeonly);
-
+extern int64 sendTablespace(char *path, bool sizeonly, XLogRecPtr prev_backup_start_lsn);
 #endif							/* _BASEBACKUP_H */

Reply via email to