Currently, CREATE DATABASE forces a checkpoint, then copies all the
files, then forces another checkpoint. The comments in the createdb()
function explain the reasons for this. The attached patch fixes this
problem by making CREATE DATABASE completely WAL-logged so that now we
can avoid checkpoints.  The patch modifies both CREATE DATABASE and
ALTER DATABASE..SET TABLESPACE to be fully WAL-logged.

One main advantage of this change is that it will be cheaper. Forcing
checkpoints on an idle system is no big deal, but when the system is
under heavy write load, it's very expensive. Another advantage is that
it makes things better for features like TDE, which might want the
pages in the source database to be encrypted using a different key or
nonce than the pages in the target database.


Design Idea:
-----------------
First, create the target database directory along with the version
file and WAL-log this operation.  Create the "relation map file" in
the target database and copy the content from the source database. For
this, we can use some modified versions of the write_relmap_file() and
WAL-log the relmap create operation along with the file content.  Now,
read the relmap file to find the relfilenode for pg_class and then we
read pg_class block by block and decode the tuples. For reading the
pg_class blocks, we can use ReadBufferWithoutRelCache() so that we
don't need the relcache.  Nothing prevents us from checking visibility
for tuples in another database because CLOG is global to the cluster.
And nothing prevents us from deforming those tuples because the column
definitions for pg_class have to be the same in every database. Then
we can get the relfilenode of every file we need to copy, and prepare
a list of all such relfilenode.  Next, for each relfilenode in the
source database, create a respective relfilenode in the target
database (for all forks) using smgrcreate, which is already a
WAL-logged operation.  Now read the source relfilenode block by block
using ReadBufferWithoutRelCache() and copy the block to the target
relfilenode using smgrextend() and WAL-log them using log_newpage().
For the source database, we can not directly use the smgrread(),
because there could be some dirty buffers so we will have to read them
through the buffer manager interface, otherwise, we will have to flush
all the dirty buffers.

WAL sequence using pg_waldump
----------------------------------------------------
1. (new wal to create db dir and write PG_VERSION file)
rmgr: Database desc: CREATE create dir 1663/16394

2. (new wal to create and write relmap file)
rmgr: RelMap   desc: CREATE database 16394 tablespace 1663 size 512

2. (create relfilenode)
rmgr: Storage  desc: CREATE base/16394/16384
rmgr: Storage  desc: CREATE base/16394/2619

3. (write page data)
rmgr: XLOG     desc: FPI , blkref #0: rel 1663/16394/2619 blk 0 FPW
rmgr: XLOG     desc: FPI , blkref #0: rel 1663/16394/2619 blk 1 FPW
............
4. (create other forks)
rmgr: Storage  desc: CREATE base/16394/2619_fsm
rmgr: Storage  CREATE base/16394/2619_vm
.............

I have attached a POC patch, which shows this idea, with this patch
all basic sanity testing and the "check-world" is passing.

Open points:
-------------------
- This is a POC patch so needs more refactoring/cleanup and testing.
- Might need to relook into the SMGR level API usage.


Credits:
-----------
Thanks to Robert Haas, for suggesting this idea and the high-level design.

-- 
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com
From e472d3cb744dc45641d36e919098f9570f80a8fd Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilipkumar@localhost.localdomain>
Date: Sat, 5 Jun 2021 17:08:13 +0530
Subject: [PATCH v1] WAL logged CREATE DATABASE

Currently, CREATE DATABASE forces a checkpoint, then copies all the files,
then forces another checkpoint. The comments in the createdb() function
explain the reasons for this. The attached patch fixes this problem by making
create database completely WAL logged and so that we can avoid the checkpoints.
---
 src/backend/access/rmgrdesc/dbasedesc.c  |   3 +-
 src/backend/access/rmgrdesc/relmapdesc.c |  10 +
 src/backend/access/transam/xlogutils.c   |  12 +-
 src/backend/commands/dbcommands.c        | 653 ++++++++++++++++++++-----------
 src/backend/storage/buffer/bufmgr.c      |  13 +-
 src/backend/utils/cache/relmapper.c      | 222 +++++++----
 src/bin/pg_rewind/parsexlog.c            |   5 +
 src/include/commands/dbcommands_xlog.h   |   7 +-
 src/include/storage/bufmgr.h             |   3 +-
 src/include/utils/relmapper.h            |   6 +-
 10 files changed, 613 insertions(+), 321 deletions(-)

diff --git a/src/backend/access/rmgrdesc/dbasedesc.c b/src/backend/access/rmgrdesc/dbasedesc.c
index 2660984..5010f72 100644
--- a/src/backend/access/rmgrdesc/dbasedesc.c
+++ b/src/backend/access/rmgrdesc/dbasedesc.c
@@ -28,8 +28,7 @@ dbase_desc(StringInfo buf, XLogReaderState *record)
 	{
 		xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) rec;
 
-		appendStringInfo(buf, "copy dir %u/%u to %u/%u",
-						 xlrec->src_tablespace_id, xlrec->src_db_id,
+		appendStringInfo(buf, "create dir %u/%u",
 						 xlrec->tablespace_id, xlrec->db_id);
 	}
 	else if (info == XLOG_DBASE_DROP)
diff --git a/src/backend/access/rmgrdesc/relmapdesc.c b/src/backend/access/rmgrdesc/relmapdesc.c
index 2f9d4f5..9ff1aae 100644
--- a/src/backend/access/rmgrdesc/relmapdesc.c
+++ b/src/backend/access/rmgrdesc/relmapdesc.c
@@ -29,6 +29,13 @@ relmap_desc(StringInfo buf, XLogReaderState *record)
 		appendStringInfo(buf, "database %u tablespace %u size %u",
 						 xlrec->dbid, xlrec->tsid, xlrec->nbytes);
 	}
+	if (info == XLOG_RELMAP_CREATE)
+	{
+		xl_relmap_update *xlrec = (xl_relmap_update *) rec;
+
+		appendStringInfo(buf, "database %u tablespace %u size %u",
+						 xlrec->dbid, xlrec->tsid, xlrec->nbytes);
+	}	
 }
 
 const char *
@@ -41,6 +48,9 @@ relmap_identify(uint8 info)
 		case XLOG_RELMAP_UPDATE:
 			id = "UPDATE";
 			break;
+		case XLOG_RELMAP_CREATE:
+			id = "CREATE";
+			break;	
 	}
 
 	return id;
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index d17d660..45bbba7 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -463,8 +463,8 @@ XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
 	if (blkno < lastblock)
 	{
 		/* page exists in file */
-		buffer = ReadBufferWithoutRelcache(rnode, forknum, blkno,
-										   mode, NULL);
+		buffer = ReadBufferWithoutRelcache(rnode, forknum, blkno, mode, NULL,
+										   RELPERSISTENCE_PERMANENT);
 	}
 	else
 	{
@@ -488,8 +488,8 @@ XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
 					LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
 				ReleaseBuffer(buffer);
 			}
-			buffer = ReadBufferWithoutRelcache(rnode, forknum,
-											   P_NEW, mode, NULL);
+			buffer = ReadBufferWithoutRelcache(rnode, forknum, P_NEW, mode,
+											   NULL, RELPERSISTENCE_PERMANENT);
 		}
 		while (BufferGetBlockNumber(buffer) < blkno);
 		/* Handle the corner case that P_NEW returns non-consecutive pages */
@@ -498,8 +498,8 @@ XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
 			if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK)
 				LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
 			ReleaseBuffer(buffer);
-			buffer = ReadBufferWithoutRelcache(rnode, forknum, blkno,
-											   mode, NULL);
+			buffer = ReadBufferWithoutRelcache(rnode, forknum, blkno, mode,
+											   NULL, RELPERSISTENCE_PERMANENT);
 		}
 	}
 
diff --git a/src/backend/commands/dbcommands.c b/src/backend/commands/dbcommands.c
index 2b159b6..53f3b6e 100644
--- a/src/backend/commands/dbcommands.c
+++ b/src/backend/commands/dbcommands.c
@@ -36,10 +36,14 @@
 #include "catalog/indexing.h"
 #include "catalog/objectaccess.h"
 #include "catalog/pg_authid.h"
+#include "catalog/pg_auth_members.h"
 #include "catalog/pg_database.h"
 #include "catalog/pg_db_role_setting.h"
+#include "catalog/pg_proc.h"
 #include "catalog/pg_subscription.h"
 #include "catalog/pg_tablespace.h"
+#include "catalog/storage.h"
+#include "catalog/storage_xlog.h"
 #include "commands/comment.h"
 #include "commands/dbcommands.h"
 #include "commands/dbcommands_xlog.h"
@@ -62,6 +66,7 @@
 #include "utils/builtins.h"
 #include "utils/fmgroids.h"
 #include "utils/pg_locale.h"
+#include "utils/relmapper.h"
 #include "utils/snapmgr.h"
 #include "utils/syscache.h"
 
@@ -77,6 +82,13 @@ typedef struct
 	Oid			dest_tsoid;		/* tablespace we are trying to move to */
 } movedb_failure_params;
 
+typedef struct RelationInfo
+{
+	RelFileNode		rnode;
+	char			relpersistence;
+} RelationInfo;
+
+
 /* non-export function prototypes */
 static void createdb_failure_callback(int code, Datum arg);
 static void movedb(const char *dbname, const char *tblspcname);
@@ -91,6 +103,387 @@ static bool have_createdb_privilege(void);
 static void remove_dbtablespaces(Oid db_id);
 static bool check_db_file_conflict(Oid db_id);
 static int	errdetail_busy_db(int notherbackends, int npreparedxacts);
+static void CreateDatabaseDirectory(char *dbpath, Oid dbid, Oid tsid,
+									bool isRedo);
+static List *GetDatabaseValidRelList(Oid srctbid, Oid srcdbid,
+									 Oid relfilenode);
+void RelationCopyStorageUsingBuffer(SMgrRelation src, SMgrRelation dst,
+									ForkNumber forkNum, char relpersistence);
+static void CopyDatabase(Oid src_dboid, Oid dboid, Oid src_tsid, Oid dst_tsid);
+
+/*
+ * CreateDatabaseDirectory - Create empty database directory and write out the
+ *							 PG_VERSION file in the database path.
+ * If isRedo is true, it's okay for the database directory to exist already.
+ */
+static void
+CreateDatabaseDirectory(char *dbpath, Oid dbid, Oid tsid, bool isRedo)
+{
+	int		fd;
+	int		nbytes;
+	char	versionfile[MAXPGPATH];
+
+	/* Create an empty db directory */
+	if (MakePGDirectory(dbpath) < 0)
+	{
+		/* Failure other than not exists or not in WAL replay? */
+		if (errno != EEXIST || !isRedo)
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not create directory \"%s\": %m", dbpath)));
+	}
+
+	/* Create PG_VERSION file in the database path */
+	snprintf(versionfile, sizeof(versionfile), "%s/%s",
+			 dbpath, "PG_VERSION");
+
+	fd = OpenTransientFile(versionfile, O_RDWR | O_CREAT | O_EXCL | PG_BINARY);
+
+	/*
+	 * If file already exist and we are in WAL replay then just retry to open
+	 * in write mode.
+	 */
+	if (fd < 0 && errno == EEXIST && isRedo)
+		fd = OpenTransientFile(versionfile, O_RDWR | PG_BINARY);
+	if (fd < 0)
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not create file \"%s\": %m", versionfile)));
+
+	nbytes = strlen(PG_MAJORVERSION);
+
+	/* If we are not in WAL replay then write the WAL */
+	if (!isRedo)
+	{
+		xl_dbase_create_rec xlrec;
+		XLogRecPtr	lsn;
+
+		/* now errors are fatal ... */
+		START_CRIT_SECTION();
+
+		xlrec.db_id = dbid;
+		xlrec.tablespace_id = tsid;
+		xlrec.nbytes = nbytes;
+
+		XLogBeginInsert();
+		XLogRegisterData((char *) (&xlrec), MinSizeOfDbaseCreateRec);
+		XLogRegisterData((char *) PG_MAJORVERSION, nbytes);
+
+		lsn = XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE);
+
+		/* As always, WAL must hit the disk before the data update does */
+		XLogFlush(lsn);
+	}
+
+	/* Write version in the PG_VERSION file */
+	pgstat_report_wait_start(WAIT_EVENT_COPY_FILE_WRITE);
+	errno = 0;
+	if ((int) write(fd, (char *) PG_MAJORVERSION, nbytes) != nbytes)
+	{
+		/* if write didn't set errno, assume problem is no disk space */
+		if (errno == 0)
+			errno = ENOSPC;
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not write to file \"%s\": %m", versionfile)));
+	}
+	pgstat_report_wait_end();
+
+	/* Close the version file */
+	CloseTransientFile(fd);
+
+	/* Critical section done */
+	if (!isRedo)
+		END_CRIT_SECTION();
+}
+
+/*
+ * GetDatabaseValidRelList - Get list of all valid relnode of the source db
+ *
+ * Process the input pg_class relfilenode and process block by block
+ * and prepare a list of all the valid relnode.
+ */
+static List *
+GetDatabaseValidRelList(Oid srctbid, Oid srcdbid, Oid relfilenode)
+{
+	SMgrRelation	rd_smgr;
+	RelFileNode		rnode;
+	BlockNumber		nblocks;
+	BlockNumber		blkno;
+	OffsetNumber	offnum;
+	OffsetNumber	maxoff;
+	Buffer			buf;
+	Page			page;
+	List		   *rnodelist = NIL;
+	HeapTupleData	tuple;
+	Form_pg_class	classForm;
+	BufferAccessStrategy bstrategy;
+
+	rnode.spcNode = srctbid;
+	rnode.dbNode = srcdbid;
+	rnode.relNode = relfilenode;
+
+	rd_smgr = smgropen(rnode, InvalidBackendId);
+	nblocks = smgrnblocks(rd_smgr, MAIN_FORKNUM);
+	bstrategy = GetAccessStrategy(BAS_BULKREAD);
+
+	/*
+	 * Process each block for the pg_class relfilenode and check for the
+	 * visible tuple.  Store the relnode of the visible tuple in the list.
+	 * Later in the caller, these relnode files will be processed and copied
+	 * to the destination block by block.
+	 */
+	for (blkno = 0; blkno < nblocks; blkno++)
+	{
+		buf = ReadBufferWithoutRelcache(rnode, MAIN_FORKNUM, blkno,
+										RBM_NORMAL, bstrategy,
+										RELPERSISTENCE_PERMANENT);
+
+		LockBuffer(buf, BUFFER_LOCK_SHARE);
+		page = BufferGetPage(buf);
+		if (PageIsNew(page) || PageIsEmpty(page))
+			continue;
+
+		/* Scan the page and prepare*/
+		maxoff = PageGetMaxOffsetNumber(page);
+		for (offnum = FirstOffsetNumber;
+			 offnum <= maxoff;
+			 offnum = OffsetNumberNext(offnum))
+		{
+			ItemId		itemid;
+
+			itemid = PageGetItemId(page, offnum);
+
+			/* Nothing to do if slot is empty or already dead */
+			if (!ItemIdIsUsed(itemid) || ItemIdIsDead(itemid) ||
+				ItemIdIsRedirected(itemid))
+				continue;
+
+			Assert(ItemIdIsNormal(itemid));
+			ItemPointerSet(&(tuple.t_self), blkno, offnum);
+			tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
+			tuple.t_len = ItemIdGetLength(itemid);
+			tuple.t_tableOid = RelationRelationId;
+
+			/* Check whether the tuple is visible */
+			if (HeapTupleSatisfiesVisibility(&tuple, GetActiveSnapshot(), buf))
+			{
+				Oid				relfilenode = InvalidOid;
+				RelationInfo   *relinfo;
+
+				classForm = (Form_pg_class) GETSTRUCT(&tuple);
+
+				/* We only want to scan the object which has storage. */
+				if (!RELKIND_HAS_STORAGE(classForm->relkind))
+					continue;
+
+				/* Ignore the global objects. */
+				if (classForm->reltablespace == GLOBALTABLESPACE_OID)
+					continue;
+
+				/* Built-in oids are mapped directly */
+				if (classForm->oid < FirstGenbkiObjectId)
+					relfilenode = classForm->oid;
+				else if (OidIsValid(classForm->relfilenode))
+					relfilenode = classForm->relfilenode;
+				else
+					continue;
+
+				Assert(OidIsValid(relfilenode));
+
+				/* Prepare a rel info element and add to the list */
+				relinfo = (RelationInfo *) palloc(sizeof(RelationInfo));
+				if (OidIsValid(classForm->reltablespace))
+					relinfo->rnode.spcNode = classForm->reltablespace;
+				else
+					relinfo->rnode.spcNode = srctbid;
+
+				relinfo->rnode.dbNode = srcdbid;
+				relinfo->rnode.relNode = relfilenode;
+				relinfo->relpersistence = classForm->relpersistence;
+
+				if (rnodelist == NULL)
+					rnodelist = list_make1(relinfo);
+				else
+					rnodelist = lappend(rnodelist, relinfo);
+			}
+		}
+		UnlockReleaseBuffer(buf);
+	}
+
+	return rnodelist;
+}
+
+/*
+ * Copy a fork's data, block by block using buffers.
+ */
+void
+RelationCopyStorageUsingBuffer(SMgrRelation src, SMgrRelation dst,
+							   ForkNumber forkNum, char relpersistence)
+{
+	Buffer		buf;
+	Page		page;
+	bool		use_wal;
+	bool		copying_initfork;
+	BlockNumber nblocks;
+	BlockNumber blkno;
+	BufferAccessStrategy bstrategy = GetAccessStrategy(BAS_BULKREAD);
+
+	/*
+	 * The init fork for an unlogged relation in many respects has to be
+	 * treated the same as normal relation, changes need to be WAL logged and
+	 * it needs to be synced to disk.
+	 */
+	copying_initfork = relpersistence == RELPERSISTENCE_UNLOGGED &&
+		forkNum == INIT_FORKNUM;
+
+	/*
+	 * We need to log the copied data in WAL iff WAL archiving/streaming is
+	 * enabled AND it's a permanent relation.  This gives the same answer as
+	 * "RelationNeedsWAL(rel) || copying_initfork", because we know the
+	 * current operation created a new relfilenode.
+	 */
+	use_wal = relpersistence == RELPERSISTENCE_PERMANENT || copying_initfork;
+
+	nblocks = smgrnblocks(src, forkNum);
+
+	for (blkno = 0; blkno < nblocks; blkno++)
+	{
+		/* If we got a cancel signal during the copy of the data, quit */
+		CHECK_FOR_INTERRUPTS();
+
+		buf = ReadBufferWithoutRelcache(src->smgr_rnode.node, forkNum,
+										blkno, RBM_NORMAL, bstrategy,
+										relpersistence);
+		page = BufferGetPage(buf);
+		if (PageIsNew(page) || PageIsEmpty(page))
+		{
+			ReleaseBuffer(buf);
+			continue;
+		}
+
+		/*
+		 * WAL-log the copied page. Unfortunately we don't know what kind of a
+		 * page this is, so we have to log the full page including any unused
+		 * space.
+		 */
+		if (use_wal)
+			log_newpage(&dst->smgr_rnode.node, forkNum, blkno, page, false);
+
+		PageSetChecksumInplace(page, blkno);
+
+		/*
+		 * Now write the page.  We say skipFsync = true because there's no
+		 * need for smgr to schedule an fsync for this write; we'll do it
+		 * ourselves below.
+		 */
+		smgrextend(dst, forkNum, blkno, (char *) page, true);
+		ReleaseBuffer(buf);
+	}
+
+	/*
+	 * When we WAL-logged rel pages, we must nonetheless fsync them.  The
+	 * reason is that since we're copying outside shared buffers, a CHECKPOINT
+	 * occurring during the copy has no way to flush the previously written
+	 * data to disk (indeed it won't know the new rel even exists).  A crash
+	 * later on would replay WAL from the checkpoint, therefore it wouldn't
+	 * replay our earlier WAL entries. If we do not fsync those pages here,
+	 * they might still not be on disk when the crash occurs.
+	 */
+	if (use_wal)
+		smgrimmedsync(dst, forkNum);
+}
+
+/*
+ * Copy data logically from src database to the destination database
+ */
+static void
+CopyDatabase(Oid src_dboid, Oid dboid, Oid src_tsid, Oid dst_tsid)
+{
+	char	   *srcpath;
+	char	   *dstpath;
+	Oid			relfilenode;
+	List	   *rnodelist = NULL;
+	ListCell   *cell;
+	RelationInfo   *relinfo;
+	RelFileNode	    srcrnode;
+	RelFileNode		dstrnode;
+
+	/* Create the default tablespace destination database directory */
+	dstpath = GetDatabasePath(dboid, dst_tsid);
+
+	/* Create database directory and write PG_VERSION file */
+	CreateDatabaseDirectory(dstpath, dboid, dst_tsid, false);
+
+	/* Copy the relfilenode mapping file */
+	srcpath = GetDatabasePath(src_dboid, src_tsid);
+	CreateAndCopyRelMap(dboid, dst_tsid, srcpath, dstpath);
+
+	/* Get pg_class relfilenode */
+	relfilenode = DatabaseRelationOidToFilenode(srcpath,
+												RelationRelationId);
+
+	/* get list of all valid relnode from the source database */
+	rnodelist = GetDatabaseValidRelList(src_tsid, src_dboid,
+										relfilenode);
+	Assert(rnodelist != NIL);
+
+	/*
+	* Process relfilenode for each file and copy block by block from source
+	* database to the destination database.
+	*/
+	foreach(cell, rnodelist)
+	{
+		SMgrRelation	src_smgr;
+		SMgrRelation	dst_smgr;
+
+		relinfo = lfirst(cell);
+		srcrnode = relinfo->rnode;
+
+		/* Use source relnode tablespace if it's not a default table space */
+		if (srcrnode.spcNode != src_tsid)
+			dstrnode.spcNode = srcrnode.spcNode;
+		else
+			dstrnode.spcNode = dst_tsid;
+
+		dstrnode.dbNode = dboid;
+		dstrnode.relNode = srcrnode.relNode;
+
+		/* Open the source and the destination relation at smgr level */
+		src_smgr = smgropen(srcrnode, InvalidBackendId);
+		dst_smgr = smgropen(dstrnode, InvalidBackendId);
+
+		RelationCreateStorage(dstrnode, relinfo->relpersistence);
+
+		/* copy main fork */
+		RelationCopyStorageUsingBuffer(src_smgr, dst_smgr, MAIN_FORKNUM,
+									   relinfo->relpersistence);
+
+		/* copy those extra forks that exist */
+		for (ForkNumber forkNum = MAIN_FORKNUM + 1;
+			forkNum <= MAX_FORKNUM; forkNum++)
+		{
+			if (smgrexists(src_smgr, forkNum))
+			{
+				smgrcreate(dst_smgr, forkNum, false);
+
+				/*
+				* WAL log creation if the relation is persistent, or this is the
+				* init fork of an unlogged relation.
+				*/
+				if (relinfo->relpersistence == RELPERSISTENCE_PERMANENT ||
+					(relinfo->relpersistence == RELPERSISTENCE_UNLOGGED &&
+					forkNum == INIT_FORKNUM))
+					log_smgrcreate(&dstrnode, forkNum);
+				RelationCopyStorageUsingBuffer(src_smgr, dst_smgr,
+											   forkNum,
+											   relinfo->relpersistence);
+			}
+		}
+	}
+
+	list_free_deep(rnodelist);
+}
 
 
 /*
@@ -99,8 +492,6 @@ static int	errdetail_busy_db(int notherbackends, int npreparedxacts);
 Oid
 createdb(ParseState *pstate, const CreatedbStmt *stmt)
 {
-	TableScanDesc scan;
-	Relation	rel;
 	Oid			src_dboid;
 	Oid			src_owner;
 	int			src_encoding = -1;
@@ -592,140 +983,19 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
 	/* Post creation hook for new database */
 	InvokeObjectPostCreateHook(DatabaseRelationId, dboid, 0);
 
-	/*
-	 * Force a checkpoint before starting the copy. This will force all dirty
-	 * buffers, including those of unlogged tables, out to disk, to ensure
-	 * source database is up-to-date on disk for the copy.
-	 * FlushDatabaseBuffers() would suffice for that, but we also want to
-	 * process any pending unlink requests. Otherwise, if a checkpoint
-	 * happened while we're copying files, a file might be deleted just when
-	 * we're about to copy it, causing the lstat() call in copydir() to fail
-	 * with ENOENT.
-	 */
-	RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT
-					  | CHECKPOINT_FLUSH_ALL);
-
-	/*
-	 * Once we start copying subdirectories, we need to be able to clean 'em
-	 * up if we fail.  Use an ENSURE block to make sure this happens.  (This
-	 * is not a 100% solution, because of the possibility of failure during
-	 * transaction commit after we leave this routine, but it should handle
-	 * most scenarios.)
-	 */
 	fparms.src_dboid = src_dboid;
 	fparms.dest_dboid = dboid;
 	PG_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
 							PointerGetDatum(&fparms));
-	{
-		/*
-		 * Iterate through all tablespaces of the template database, and copy
-		 * each one to the new database.
-		 */
-		rel = table_open(TableSpaceRelationId, AccessShareLock);
-		scan = table_beginscan_catalog(rel, 0, NULL);
-		while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
-		{
-			Form_pg_tablespace spaceform = (Form_pg_tablespace) GETSTRUCT(tuple);
-			Oid			srctablespace = spaceform->oid;
-			Oid			dsttablespace;
-			char	   *srcpath;
-			char	   *dstpath;
-			struct stat st;
-
-			/* No need to copy global tablespace */
-			if (srctablespace == GLOBALTABLESPACE_OID)
-				continue;
-
-			srcpath = GetDatabasePath(src_dboid, srctablespace);
-
-			if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
-				directory_is_empty(srcpath))
-			{
-				/* Assume we can ignore it */
-				pfree(srcpath);
-				continue;
-			}
-
-			if (srctablespace == src_deftablespace)
-				dsttablespace = dst_deftablespace;
-			else
-				dsttablespace = srctablespace;
-
-			dstpath = GetDatabasePath(dboid, dsttablespace);
-
-			/*
-			 * Copy this subdirectory to the new location
-			 *
-			 * We don't need to copy subdirectories
-			 */
-			copydir(srcpath, dstpath, false);
-
-			/* Record the filesystem change in XLOG */
-			{
-				xl_dbase_create_rec xlrec;
-
-				xlrec.db_id = dboid;
-				xlrec.tablespace_id = dsttablespace;
-				xlrec.src_db_id = src_dboid;
-				xlrec.src_tablespace_id = srctablespace;
-
-				XLogBeginInsert();
-				XLogRegisterData((char *) &xlrec, sizeof(xl_dbase_create_rec));
-
-				(void) XLogInsert(RM_DBASE_ID,
-								  XLOG_DBASE_CREATE | XLR_SPECIAL_REL_UPDATE);
-			}
-		}
-		table_endscan(scan);
-		table_close(rel, AccessShareLock);
-
-		/*
-		 * We force a checkpoint before committing.  This effectively means
-		 * that committed XLOG_DBASE_CREATE operations will never need to be
-		 * replayed (at least not in ordinary crash recovery; we still have to
-		 * make the XLOG entry for the benefit of PITR operations). This
-		 * avoids two nasty scenarios:
-		 *
-		 * #1: When PITR is off, we don't XLOG the contents of newly created
-		 * indexes; therefore the drop-and-recreate-whole-directory behavior
-		 * of DBASE_CREATE replay would lose such indexes.
-		 *
-		 * #2: Since we have to recopy the source database during DBASE_CREATE
-		 * replay, we run the risk of copying changes in it that were
-		 * committed after the original CREATE DATABASE command but before the
-		 * system crash that led to the replay.  This is at least unexpected
-		 * and at worst could lead to inconsistencies, eg duplicate table
-		 * names.
-		 *
-		 * (Both of these were real bugs in releases 8.0 through 8.0.3.)
-		 *
-		 * In PITR replay, the first of these isn't an issue, and the second
-		 * is only a risk if the CREATE DATABASE and subsequent template
-		 * database change both occur while a base backup is being taken.
-		 * There doesn't seem to be much we can do about that except document
-		 * it as a limitation.
-		 *
-		 * Perhaps if we ever implement CREATE DATABASE in a less cheesy way,
-		 * we can avoid this.
-		 */
-		RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
-
-		/*
-		 * Close pg_database, but keep lock till commit.
-		 */
-		table_close(pg_database_rel, NoLock);
-
-		/*
-		 * Force synchronous commit, thus minimizing the window between
-		 * creation of the database files and committal of the transaction. If
-		 * we crash before committing, we'll have a DB that's taking up disk
-		 * space but is not in pg_database, which is not good.
-		 */
-		ForceSyncCommit();
-	}
+	CopyDatabase(src_dboid, dboid, src_deftablespace, dst_deftablespace);
 	PG_END_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
 								PointerGetDatum(&fparms));
 
+	/*
+	 * Close pg_database, but keep lock till commit.
+	 */
+	table_close(pg_database_rel, NoLock);
+
 	return dboid;
 }
 
@@ -1220,43 +1490,12 @@ movedb(const char *dbname, const char *tblspcname)
 				 errdetail_busy_db(notherbackends, npreparedxacts)));
 
 	/*
-	 * Get old and new database paths
+	 * Get new database path
 	 */
 	src_dbpath = GetDatabasePath(db_id, src_tblspcoid);
 	dst_dbpath = GetDatabasePath(db_id, dst_tblspcoid);
 
 	/*
-	 * Force a checkpoint before proceeding. This will force all dirty
-	 * buffers, including those of unlogged tables, out to disk, to ensure
-	 * source database is up-to-date on disk for the copy.
-	 * FlushDatabaseBuffers() would suffice for that, but we also want to
-	 * process any pending unlink requests. Otherwise, the check for existing
-	 * files in the target directory might fail unnecessarily, not to mention
-	 * that the copy might fail due to source files getting deleted under it.
-	 * On Windows, this also ensures that background procs don't hold any open
-	 * files, which would cause rmdir() to fail.
-	 */
-	RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT
-					  | CHECKPOINT_FLUSH_ALL);
-
-	/*
-	 * Now drop all buffers holding data of the target database; they should
-	 * no longer be dirty so DropDatabaseBuffers is safe.
-	 *
-	 * It might seem that we could just let these buffers age out of shared
-	 * buffers naturally, since they should not get referenced anymore.  The
-	 * problem with that is that if the user later moves the database back to
-	 * its original tablespace, any still-surviving buffers would appear to
-	 * contain valid data again --- but they'd be missing any changes made in
-	 * the database while it was in the new tablespace.  In any case, freeing
-	 * buffers that should never be used again seems worth the cycles.
-	 *
-	 * Note: it'd be sufficient to get rid of buffers matching db_id and
-	 * src_tblspcoid, but bufmgr.c presently provides no API for that.
-	 */
-	DropDatabaseBuffers(db_id);
-
-	/*
 	 * Check for existence of files in the target directory, i.e., objects of
 	 * this database that are already in the target tablespace.  We can't
 	 * allow the move in such a case, because we would need to change those
@@ -1301,28 +1540,7 @@ movedb(const char *dbname, const char *tblspcname)
 	PG_ENSURE_ERROR_CLEANUP(movedb_failure_callback,
 							PointerGetDatum(&fparms));
 	{
-		/*
-		 * Copy files from the old tablespace to the new one
-		 */
-		copydir(src_dbpath, dst_dbpath, false);
-
-		/*
-		 * Record the filesystem change in XLOG
-		 */
-		{
-			xl_dbase_create_rec xlrec;
-
-			xlrec.db_id = db_id;
-			xlrec.tablespace_id = dst_tblspcoid;
-			xlrec.src_db_id = db_id;
-			xlrec.src_tablespace_id = src_tblspcoid;
-
-			XLogBeginInsert();
-			XLogRegisterData((char *) &xlrec, sizeof(xl_dbase_create_rec));
-
-			(void) XLogInsert(RM_DBASE_ID,
-							  XLOG_DBASE_CREATE | XLR_SPECIAL_REL_UPDATE);
-		}
+		CopyDatabase(db_id, db_id, src_tblspcoid, dst_tblspcoid);
 
 		/*
 		 * Update the database's pg_database tuple
@@ -1356,22 +1574,6 @@ movedb(const char *dbname, const char *tblspcname)
 		systable_endscan(sysscan);
 
 		/*
-		 * Force another checkpoint here.  As in CREATE DATABASE, this is to
-		 * ensure that we don't have to replay a committed XLOG_DBASE_CREATE
-		 * operation, which would cause us to lose any unlogged operations
-		 * done in the new DB tablespace before the next checkpoint.
-		 */
-		RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
-
-		/*
-		 * Force synchronous commit, thus minimizing the window between
-		 * copying the database files and committal of the transaction. If we
-		 * crash before committing, we'll leave an orphaned set of files on
-		 * disk, which is not fatal but not good either.
-		 */
-		ForceSyncCommit();
-
-		/*
 		 * Close pg_database, but keep lock till commit.
 		 */
 		table_close(pgdbrel, NoLock);
@@ -1380,6 +1582,23 @@ movedb(const char *dbname, const char *tblspcname)
 								PointerGetDatum(&fparms));
 
 	/*
+	 * Now drop all buffers holding data of the target database; they should
+	 * no longer be dirty so DropDatabaseBuffers is safe.
+	 *
+	 * It might seem that we could just let these buffers age out of shared
+	 * buffers naturally, since they should not get referenced anymore.  The
+	 * problem with that is that if the user later moves the database back to
+	 * its original tablespace, any still-surviving buffers would appear to
+	 * contain valid data again --- but they'd be missing any changes made in
+	 * the database while it was in the new tablespace.  In any case, freeing
+	 * buffers that should never be used again seems worth the cycles.
+	 *
+	 * Note: it'd be sufficient to get rid of buffers matching db_id and
+	 * src_tblspcoid, but bufmgr.c presently provides no API for that.
+	 */
+	DropDatabaseBuffers(db_id);
+
+	/*
 	 * Commit the transaction so that the pg_database update is committed. If
 	 * we crash while removing files, the database won't be corrupt, we'll
 	 * just leave some orphaned files in the old directory.
@@ -2183,39 +2402,11 @@ dbase_redo(XLogReaderState *record)
 	if (info == XLOG_DBASE_CREATE)
 	{
 		xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) XLogRecGetData(record);
-		char	   *src_path;
-		char	   *dst_path;
-		struct stat st;
-
-		src_path = GetDatabasePath(xlrec->src_db_id, xlrec->src_tablespace_id);
-		dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
+		char	   *dbpath;
 
-		/*
-		 * Our theory for replaying a CREATE is to forcibly drop the target
-		 * subdirectory if present, then re-copy the source data. This may be
-		 * more work than needed, but it is simple to implement.
-		 */
-		if (stat(dst_path, &st) == 0 && S_ISDIR(st.st_mode))
-		{
-			if (!rmtree(dst_path, true))
-				/* If this failed, copydir() below is going to error. */
-				ereport(WARNING,
-						(errmsg("some useless files may be left behind in old database directory \"%s\"",
-								dst_path)));
-		}
-
-		/*
-		 * Force dirty buffers out to disk, to ensure source database is
-		 * up-to-date for the copy.
-		 */
-		FlushDatabaseBuffers(xlrec->src_db_id);
-
-		/*
-		 * Copy this subdirectory to the new location
-		 *
-		 * We don't need to copy subdirectories
-		 */
-		copydir(src_path, dst_path, false);
+		dbpath = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
+		CreateDatabaseDirectory(dbpath, xlrec->db_id, xlrec->tablespace_id,
+								true);
 	}
 	else if (info == XLOG_DBASE_DROP)
 	{
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 4b296a2..e198946 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -776,24 +776,17 @@ ReadBufferExtended(Relation reln, ForkNumber forkNum, BlockNumber blockNum,
 /*
  * ReadBufferWithoutRelcache -- like ReadBufferExtended, but doesn't require
  *		a relcache entry for the relation.
- *
- * NB: At present, this function may only be used on permanent relations, which
- * is OK, because we only use it during XLOG replay.  If in the future we
- * want to use it on temporary or unlogged relations, we could pass additional
- * parameters.
  */
 Buffer
 ReadBufferWithoutRelcache(RelFileNode rnode, ForkNumber forkNum,
 						  BlockNumber blockNum, ReadBufferMode mode,
-						  BufferAccessStrategy strategy)
+						  BufferAccessStrategy strategy, char relpersistence)
 {
 	bool		hit;
 
 	SMgrRelation smgr = smgropen(rnode, InvalidBackendId);
 
-	Assert(InRecovery);
-
-	return ReadBuffer_common(smgr, RELPERSISTENCE_PERMANENT, forkNum, blockNum,
+	return ReadBuffer_common(smgr, relpersistence, forkNum, blockNum,
 							 mode, strategy, &hit);
 }
 
@@ -803,7 +796,7 @@ ReadBufferWithoutRelcache(RelFileNode rnode, ForkNumber forkNum,
  *
  * *hit is set to true if the request was satisfied from shared buffer cache.
  */
-static Buffer
+Buffer
 ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 				  BlockNumber blockNum, ReadBufferMode mode,
 				  BufferAccessStrategy strategy, bool *hit)
diff --git a/src/backend/utils/cache/relmapper.c b/src/backend/utils/cache/relmapper.c
index 424624c..58ef902 100644
--- a/src/backend/utils/cache/relmapper.c
+++ b/src/backend/utils/cache/relmapper.c
@@ -136,7 +136,13 @@ static void apply_map_update(RelMapFile *map, Oid relationId, Oid fileNode,
 							 bool add_okay);
 static void merge_map_updates(RelMapFile *map, const RelMapFile *updates,
 							  bool add_okay);
+static void read_relmap_file(char *mapfilename, RelMapFile *map);
 static void load_relmap_file(bool shared);
+static void write_relmap_file_internal(char *mapfilename, RelMapFile *newmap,
+									   RelMapFile *realmap, bool write_wal,
+									   bool send_sinval, bool preserve_files,
+									   Oid dbid, Oid tsid, const char *dbpath,
+									   uint8 info);
 static void write_relmap_file(bool shared, RelMapFile *newmap,
 							  bool write_wal, bool send_sinval, bool preserve_files,
 							  Oid dbid, Oid tsid, const char *dbpath);
@@ -250,6 +256,32 @@ RelationMapFilenodeToOid(Oid filenode, bool shared)
 }
 
 /*
+ * DatabaseRelationOidToFilenode
+ *
+ * Find relfilenode for the given relation id in the dbpath
+ */
+Oid
+DatabaseRelationOidToFilenode(char *dbpath, Oid relationId)
+{
+	RelMapFile	map;
+	int			i;
+	char		mapfilename[MAXPGPATH];
+
+	/* read the relmapfile from the source database */
+	snprintf(mapfilename, sizeof(mapfilename), "%s/%s",
+			 dbpath, RELMAPPER_FILENAME);
+	read_relmap_file(mapfilename, &map);
+
+	for (i = 0; i < map.num_mappings; i++)
+	{
+		if (relationId == map.mappings[i].mapoid)
+			return map.mappings[i].mapfilenode;
+	}
+
+	return InvalidOid;
+}
+
+/*
  * RelationMapUpdateMap
  *
  * Install a new relfilenode mapping for the specified relation.
@@ -687,36 +719,37 @@ RestoreRelationMap(char *startAddress)
 }
 
 /*
- * load_relmap_file -- load data from the shared or local map file
- *
- * Because the map file is essential for access to core system catalogs,
- * failure to read it is a fatal error.
- *
- * Note that the local case requires DatabasePath to be set up.
+ * copy relmapfile from source db path to the destination db path.
  */
-static void
-load_relmap_file(bool shared)
+void
+CreateAndCopyRelMap(Oid dbid, Oid tsid, char *srcdbpath, char *dstdbpath)
 {
-	RelMapFile *map;
+	RelMapFile	map;
 	char		mapfilename[MAXPGPATH];
+
+	LWLockAcquire(RelationMappingLock, LW_EXCLUSIVE);
+
+	/* read the relmapfile from the source database */
+	snprintf(mapfilename, sizeof(mapfilename), "%s/%s",
+			 srcdbpath, RELMAPPER_FILENAME);
+	read_relmap_file(mapfilename, &map);
+
+	LWLockRelease(RelationMappingLock);
+
+	/* write the relmapfile of the destination database */
+	snprintf(mapfilename, sizeof(mapfilename), "%s/%s",
+			 dstdbpath, RELMAPPER_FILENAME);
+	write_relmap_file_internal(mapfilename, &map, &map, true, false, true,
+							   dbid, tsid, dstdbpath, XLOG_RELMAP_CREATE);
+}
+
+static void
+read_relmap_file(char *mapfilename, RelMapFile *map)
+{
 	pg_crc32c	crc;
 	int			fd;
 	int			r;
 
-	if (shared)
-	{
-		snprintf(mapfilename, sizeof(mapfilename), "global/%s",
-				 RELMAPPER_FILENAME);
-		map = &shared_map;
-	}
-	else
-	{
-		snprintf(mapfilename, sizeof(mapfilename), "%s/%s",
-				 DatabasePath, RELMAPPER_FILENAME);
-		map = &local_map;
-	}
-
-	/* Read data ... */
 	fd = OpenTransientFile(mapfilename, O_RDONLY | PG_BINARY);
 	if (fd < 0)
 		ereport(FATAL,
@@ -773,62 +806,44 @@ load_relmap_file(bool shared)
 }
 
 /*
- * Write out a new shared or local map file with the given contents.
- *
- * The magic number and CRC are automatically updated in *newmap.  On
- * success, we copy the data to the appropriate permanent static variable.
- *
- * If write_wal is true then an appropriate WAL message is emitted.
- * (It will be false for bootstrap and WAL replay cases.)
- *
- * If send_sinval is true then a SI invalidation message is sent.
- * (This should be true except in bootstrap case.)
+ * load_relmap_file -- load data from the shared or local map file
  *
- * If preserve_files is true then the storage manager is warned not to
- * delete the files listed in the map.
+ * Because the map file is essential for access to core system catalogs,
+ * failure to read it is a fatal error.
  *
- * Because this may be called during WAL replay when MyDatabaseId,
- * DatabasePath, etc aren't valid, we require the caller to pass in suitable
- * values.  The caller is also responsible for being sure no concurrent
- * map update could be happening.
+ * Note that the local case requires DatabasePath to be set up.
  */
 static void
-write_relmap_file(bool shared, RelMapFile *newmap,
-				  bool write_wal, bool send_sinval, bool preserve_files,
-				  Oid dbid, Oid tsid, const char *dbpath)
+load_relmap_file(bool shared)
 {
-	int			fd;
-	RelMapFile *realmap;
+	RelMapFile *map;
 	char		mapfilename[MAXPGPATH];
 
-	/*
-	 * Fill in the overhead fields and update CRC.
-	 */
-	newmap->magic = RELMAPPER_FILEMAGIC;
-	if (newmap->num_mappings < 0 || newmap->num_mappings > MAX_MAPPINGS)
-		elog(ERROR, "attempt to write bogus relation mapping");
-
-	INIT_CRC32C(newmap->crc);
-	COMP_CRC32C(newmap->crc, (char *) newmap, offsetof(RelMapFile, crc));
-	FIN_CRC32C(newmap->crc);
-
-	/*
-	 * Open the target file.  We prefer to do this before entering the
-	 * critical section, so that an open() failure need not force PANIC.
-	 */
 	if (shared)
 	{
 		snprintf(mapfilename, sizeof(mapfilename), "global/%s",
 				 RELMAPPER_FILENAME);
-		realmap = &shared_map;
+		map = &shared_map;
 	}
 	else
 	{
 		snprintf(mapfilename, sizeof(mapfilename), "%s/%s",
-				 dbpath, RELMAPPER_FILENAME);
-		realmap = &local_map;
+				 DatabasePath, RELMAPPER_FILENAME);
+		map = &local_map;
 	}
 
+	/* Read data ... */
+	read_relmap_file(mapfilename, map);
+}
+
+static void
+write_relmap_file_internal(char *mapfilename, RelMapFile *newmap,
+						   RelMapFile *realmap, bool write_wal,
+						   bool send_sinval, bool preserve_files, Oid dbid,
+						   Oid tsid, const char *dbpath, uint8 info)
+{
+	int			fd;
+
 	fd = OpenTransientFile(mapfilename, O_WRONLY | O_CREAT | PG_BINARY);
 	if (fd < 0)
 		ereport(ERROR,
@@ -852,7 +867,7 @@ write_relmap_file(bool shared, RelMapFile *newmap,
 		XLogRegisterData((char *) (&xlrec), MinSizeOfRelmapUpdate);
 		XLogRegisterData((char *) newmap, sizeof(RelMapFile));
 
-		lsn = XLogInsert(RM_RELMAP_ID, XLOG_RELMAP_UPDATE);
+		lsn = XLogInsert(RM_RELMAP_ID, info);
 
 		/* As always, WAL must hit the disk before the data update does */
 		XLogFlush(lsn);
@@ -944,6 +959,67 @@ write_relmap_file(bool shared, RelMapFile *newmap,
 }
 
 /*
+ * Write out a new shared or local map file with the given contents.
+ *
+ * The magic number and CRC are automatically updated in *newmap.  On
+ * success, we copy the data to the appropriate permanent static variable.
+ *
+ * If write_wal is true then an appropriate WAL message is emitted.
+ * (It will be false for bootstrap and WAL replay cases.)
+ *
+ * If send_sinval is true then a SI invalidation message is sent.
+ * (This should be true except in bootstrap case.)
+ *
+ * If preserve_files is true then the storage manager is warned not to
+ * delete the files listed in the map.
+ *
+ * Because this may be called during WAL replay when MyDatabaseId,
+ * DatabasePath, etc aren't valid, we require the caller to pass in suitable
+ * values.  The caller is also responsible for being sure no concurrent
+ * map update could be happening.
+ */
+static void
+write_relmap_file(bool shared, RelMapFile *newmap,
+				  bool write_wal, bool send_sinval, bool preserve_files,
+				  Oid dbid, Oid tsid, const char *dbpath)
+{
+	RelMapFile *realmap;
+	char		mapfilename[MAXPGPATH];
+
+	/*
+	 * Fill in the overhead fields and update CRC.
+	 */
+	newmap->magic = RELMAPPER_FILEMAGIC;
+	if (newmap->num_mappings < 0 || newmap->num_mappings > MAX_MAPPINGS)
+		elog(ERROR, "attempt to write bogus relation mapping");
+
+	INIT_CRC32C(newmap->crc);
+	COMP_CRC32C(newmap->crc, (char *) newmap, offsetof(RelMapFile, crc));
+	FIN_CRC32C(newmap->crc);
+
+	/*
+	 * Open the target file.  We prefer to do this before entering the
+	 * critical section, so that an open() failure need not force PANIC.
+	 */
+	if (shared)
+	{
+		snprintf(mapfilename, sizeof(mapfilename), "global/%s",
+				 RELMAPPER_FILENAME);
+		realmap = &shared_map;
+	}
+	else
+	{
+		snprintf(mapfilename, sizeof(mapfilename), "%s/%s",
+				 dbpath, RELMAPPER_FILENAME);
+		realmap = &local_map;
+	}
+
+	write_relmap_file_internal(mapfilename, newmap, realmap, write_wal,
+							   send_sinval, preserve_files, dbid, tsid,
+							   dbpath, XLOG_RELMAP_UPDATE);
+}
+
+/*
  * Merge the specified updates into the appropriate "real" map,
  * and write out the changes.  This function must be used for committing
  * updates during normal multiuser operation.
@@ -1004,7 +1080,7 @@ relmap_redo(XLogReaderState *record)
 	/* Backup blocks are not used in relmap records */
 	Assert(!XLogRecHasAnyBlockRefs(record));
 
-	if (info == XLOG_RELMAP_UPDATE)
+	if ((info == XLOG_RELMAP_UPDATE) || (info == XLOG_RELMAP_CREATE))
 	{
 		xl_relmap_update *xlrec = (xl_relmap_update *) XLogRecGetData(record);
 		RelMapFile	newmap;
@@ -1027,10 +1103,22 @@ relmap_redo(XLogReaderState *record)
 		 * so we don't bother to take the RelationMappingLock.  We would need
 		 * to do so if load_relmap_file needed to interlock against writers.
 		 */
-		write_relmap_file((xlrec->dbid == InvalidOid), &newmap,
-						  false, true, false,
-						  xlrec->dbid, xlrec->tsid, dbpath);
+		if (info == XLOG_RELMAP_UPDATE)
+			write_relmap_file((xlrec->dbid == InvalidOid), &newmap,
+							false, true, false,
+							xlrec->dbid, xlrec->tsid, dbpath);
+		else
+		{
+			char		mapfilename[MAXPGPATH];
 
+			/* We need to construct the pathname for this database */
+			snprintf(mapfilename, sizeof(mapfilename), "%s/%s",
+					 dbpath, RELMAPPER_FILENAME);
+
+			write_relmap_file_internal(mapfilename, &newmap, &newmap, false,
+									  false, false, xlrec->dbid, xlrec->tsid,
+									  dbpath, 0);
+		}
 		pfree(dbpath);
 	}
 	else
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index 59ebac7..189123b 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -23,6 +23,7 @@
 #include "fe_utils/archive.h"
 #include "filemap.h"
 #include "pg_rewind.h"
+#include "utils/relmapper.h"
 
 /*
  * RmgrNames is an array of resource manager names, to make error messages
@@ -390,6 +391,10 @@ extractPageInfo(XLogReaderState *record)
 		 * system. No need to do anything special here.
 		 */
 	}
+	else if (rmid == RM_RELMAP_ID && info == XLOG_RELMAP_CREATE)
+	{
+		/* ignore */
+	}
 	else if (rmid == RM_SMGR_ID && rminfo == XLOG_SMGR_CREATE)
 	{
 		/*
diff --git a/src/include/commands/dbcommands_xlog.h b/src/include/commands/dbcommands_xlog.h
index f5ed762..9e4e382 100644
--- a/src/include/commands/dbcommands_xlog.h
+++ b/src/include/commands/dbcommands_xlog.h
@@ -23,13 +23,14 @@
 
 typedef struct xl_dbase_create_rec
 {
-	/* Records copying of a single subdirectory incl. contents */
 	Oid			db_id;
 	Oid			tablespace_id;
-	Oid			src_db_id;
-	Oid			src_tablespace_id;
+	int32       nbytes;         /* size of version data */
+	char		version[FLEXIBLE_ARRAY_MEMBER];
 } xl_dbase_create_rec;
 
+#define MinSizeOfDbaseCreateRec offsetof(xl_dbase_create_rec, version)
+
 typedef struct xl_dbase_drop_rec
 {
 	Oid			db_id;
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index aa64fb4..bef6d6a 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -184,7 +184,8 @@ extern Buffer ReadBufferExtended(Relation reln, ForkNumber forkNum,
 								 BufferAccessStrategy strategy);
 extern Buffer ReadBufferWithoutRelcache(RelFileNode rnode,
 										ForkNumber forkNum, BlockNumber blockNum,
-										ReadBufferMode mode, BufferAccessStrategy strategy);
+										ReadBufferMode mode, BufferAccessStrategy strategy,
+										char relpersistence);
 extern void ReleaseBuffer(Buffer buffer);
 extern void UnlockReleaseBuffer(Buffer buffer);
 extern void MarkBufferDirty(Buffer buffer);
diff --git a/src/include/utils/relmapper.h b/src/include/utils/relmapper.h
index c0d14da..6f42ace 100644
--- a/src/include/utils/relmapper.h
+++ b/src/include/utils/relmapper.h
@@ -23,6 +23,7 @@
  */
 
 #define XLOG_RELMAP_UPDATE		0x00
+#define XLOG_RELMAP_CREATE		0x10
 
 typedef struct xl_relmap_update
 {
@@ -39,6 +40,8 @@ extern Oid	RelationMapOidToFilenode(Oid relationId, bool shared);
 
 extern Oid	RelationMapFilenodeToOid(Oid relationId, bool shared);
 
+extern Oid DatabaseRelationOidToFilenode(char *dbpath, Oid relationId);
+
 extern void RelationMapUpdateMap(Oid relationId, Oid fileNode, bool shared,
 								 bool immediate);
 
@@ -62,7 +65,8 @@ extern void RelationMapInitializePhase3(void);
 extern Size EstimateRelationMapSpace(void);
 extern void SerializeRelationMap(Size maxSize, char *startAddress);
 extern void RestoreRelationMap(char *startAddress);
-
+extern void CreateAndCopyRelMap(Oid dbid, Oid tsid, char *srcdbpath,
+								char *dstdbpath);
 extern void relmap_redo(XLogReaderState *record);
 extern void relmap_desc(StringInfo buf, XLogReaderState *record);
 extern const char *relmap_identify(uint8 info);
-- 
1.8.3.1

Reply via email to