At Fri, 03 Mar 2023 18:03:53 +0900 (JST), Kyotaro Horiguchi 
<horikyota....@gmail.com> wrote in 
> Correctly they are three parts. The attached patch is the first part -
> the storage mark files, which are used to identify storage files that
> have not been committed and should be removed during the next
> startup. This feature resolves the issue of orphaned storage files
> that may result from a crash occurring during the execution of a
> transaction involving the creation of a new table.
> 
> I'll post all of the three parts shortly.

Mmm. It took longer than I said, but this is the patch set that
includes all three parts.

1. "Mark files" to prevent orphan storage files for in-transaction
  created relations after a crash.

2. In-place persistence change: For ALTER TABLE SET LOGGED/UNLOGGED
  with wal_level minimal, and ALTER TABLE SET UNLOGGED with other
  wal_levels, the commands don't require a file copy for the relation
  storage. ALTER TABLE SET LOGGED with non-minimal wal_level emits
  bulk FPIs instead of a bunch of individual INSERTs.

3. An extension to ALTER TABLE SET (UN)LOGGED that can handle all
  tables in a tablespace at once.


As a side note, I quickly go over the behavior of the mark files
introduced by the first patch, particularly what happens when deletion
fails.

(1) The mark file for MAIN fork ("<oid>.u") corresponds to all forks,
    while the mark file for INIT fork ("<oid>_init.u") corresponds to
    INIT fork alone.

(2) The mark file is created just before the the corresponding storage
    file is made. This is always logged in the WAL.

(3) The mark file is deleted after removing the corresponding storage
    file during the commit and rollback. This action is logged in the
    WAL, too. If the deletion fails, an ERROR is output and the
    transaction aborts.

(4) If a crash leaves a mark file behind, server will try to delete it
    after successfully removing the corresponding storage file during
    the subsequent startup that runs a recovery. If deletion fails,
    server leaves the mark file alone with emitting a WARNING. (The
    same behavior for non-mark files.)

(5) If the deletion of the mark file fails, the leftover mark file
    prevents the creation of the corresponding storage file (causing
    an ERROR).  The leftover mark files don't result in the removal of
    the wrong files due to that behavior.

(6) The mark file for an INIT fork is created only when ALTER TABLE
    SET UNLOGGED is executed (not for CREATE UNLOGGED TABLE) to signal
    the crash-cleanup code to remove the INIT fork. (Otherwise the
    cleanup code removes the main fork instead. This is the main
    objective of introducing the mark files.)

regards.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
>From ba4b8140fe582ceec4ea810621e17d6a1fe9c408 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horikyota....@gmail.com>
Date: Thu, 2 Mar 2023 17:25:12 +0900
Subject: [PATCH v27 1/3] Storage mark files

In certain situations, specific operations followed by a crash-restart
can result in orphaned storage files.  These files cannot be removed
through standard methods.  To address this issue, this commit
implements 'mark files' that conveys information about the storage
file. Specifically, the "UNCOMMITED" mark file is introduced to denote
files that have not been committed and should be removed during the
next startup.
---
 src/backend/access/rmgrdesc/smgrdesc.c    |  37 +++
 src/backend/access/transam/README         |  10 +
 src/backend/access/transam/xact.c         |   7 +
 src/backend/access/transam/xlogrecovery.c |  18 ++
 src/backend/backup/basebackup.c           |   9 +-
 src/backend/catalog/storage.c             | 270 ++++++++++++++++++-
 src/backend/storage/file/fd.c             |   4 +-
 src/backend/storage/file/reinit.c         | 313 +++++++++++++++-------
 src/backend/storage/smgr/md.c             |  95 ++++++-
 src/backend/storage/smgr/smgr.c           |  32 +++
 src/backend/storage/sync/sync.c           |  26 +-
 src/bin/pg_rewind/parsexlog.c             |  16 ++
 src/common/relpath.c                      |  47 ++--
 src/include/catalog/storage.h             |   3 +
 src/include/catalog/storage_xlog.h        |  35 ++-
 src/include/common/relpath.h              |   9 +-
 src/include/storage/fd.h                  |   1 +
 src/include/storage/md.h                  |   8 +-
 src/include/storage/reinit.h              |   8 +-
 src/include/storage/smgr.h                |  17 ++
 src/test/recovery/t/013_crash_restart.pl  |  21 ++
 src/tools/pgindent/typedefs.list          |   6 +
 22 files changed, 848 insertions(+), 144 deletions(-)

diff --git a/src/backend/access/rmgrdesc/smgrdesc.c b/src/backend/access/rmgrdesc/smgrdesc.c
index bd841b96e8..f8187385c4 100644
--- a/src/backend/access/rmgrdesc/smgrdesc.c
+++ b/src/backend/access/rmgrdesc/smgrdesc.c
@@ -40,6 +40,37 @@ smgr_desc(StringInfo buf, XLogReaderState *record)
 						 xlrec->blkno, xlrec->flags);
 		pfree(path);
 	}
+	else if (info == XLOG_SMGR_UNLINK)
+	{
+		xl_smgr_unlink *xlrec = (xl_smgr_unlink *) rec;
+		char	   *path = relpathperm(xlrec->rlocator, xlrec->forkNum);
+
+		appendStringInfoString(buf, path);
+		pfree(path);
+	}
+	else if (info == XLOG_SMGR_MARK)
+	{
+		xl_smgr_mark *xlrec = (xl_smgr_mark *) rec;
+		char	   *path = GetRelationPath(xlrec->rlocator.dbOid,
+										   xlrec->rlocator.spcOid,
+										   xlrec->rlocator.relNumber,
+										   InvalidBackendId,
+										   xlrec->forkNum, xlrec->mark);
+		char	   *action = "<none>";
+
+		switch (xlrec->action)
+		{
+			case XLOG_SMGR_MARK_CREATE:
+				action = "CREATE";
+				break;
+			case XLOG_SMGR_MARK_UNLINK:
+				action = "DELETE";
+				break;
+		}
+
+		appendStringInfo(buf, "%s %s", action, path);
+		pfree(path);
+	}
 }
 
 const char *
@@ -55,6 +86,12 @@ smgr_identify(uint8 info)
 		case XLOG_SMGR_TRUNCATE:
 			id = "TRUNCATE";
 			break;
+		case XLOG_SMGR_UNLINK:
+			id = "UNLINK";
+			break;
+		case XLOG_SMGR_MARK:
+			id = "MARK";
+			break;
 	}
 
 	return id;
diff --git a/src/backend/access/transam/README b/src/backend/access/transam/README
index 22c8ae9755..bf83d19abd 100644
--- a/src/backend/access/transam/README
+++ b/src/backend/access/transam/README
@@ -741,6 +741,16 @@ we must panic and abort recovery.  The DBA will have to manually clean up
 then restart recovery.  This is part of the reason for not writing a WAL
 entry until we've successfully done the original action.
 
+================================
+Smgr MARK files
+--------------------------------
+
+An smgr mark file is an empty file that is created alongside a new
+relation storage file to signal that the storage file must be cleaned
+up during recovery.  In contrast to the four actions above, failing to
+remove these files will result in a data loss, in which case the
+server will shut down.
+
 
 Skipping WAL for New RelFileLocator
 --------------------------------
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index b876401260..acbf8f1b12 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -2227,6 +2227,9 @@ CommitTransaction(void)
 	 */
 	smgrDoPendingSyncs(true, is_parallel_worker);
 
+	/* Likewise delete mark files for files created during this transaction. */
+	smgrDoPendingCleanups(true);
+
 	/* close large objects before lower-level cleanup */
 	AtEOXact_LargeObject(true);
 
@@ -2478,6 +2481,9 @@ PrepareTransaction(void)
 	 */
 	smgrDoPendingSyncs(true, false);
 
+	/* Likewise delete mark files for files created during this transaction. */
+	smgrDoPendingCleanups(true);
+
 	/* close large objects before lower-level cleanup */
 	AtEOXact_LargeObject(true);
 
@@ -2806,6 +2812,7 @@ AbortTransaction(void)
 	AfterTriggerEndXact(false); /* 'false' means it's abort */
 	AtAbort_Portals();
 	smgrDoPendingSyncs(false, is_parallel_worker);
+	smgrDoPendingCleanups(false);
 	AtEOXact_LargeObject(false);
 	AtAbort_Notify();
 	AtEOXact_RelationMap(false, is_parallel_worker);
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index dbe9394762..4d28635f64 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -42,6 +42,7 @@
 #include "access/xlogutils.h"
 #include "backup/basebackup.h"
 #include "catalog/pg_control.h"
+#include "catalog/storage.h"
 #include "commands/tablespace.h"
 #include "common/file_utils.h"
 #include "miscadmin.h"
@@ -56,6 +57,7 @@
 #include "storage/pmsignal.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
+#include "storage/reinit.h"
 #include "storage/spin.h"
 #include "utils/builtins.h"
 #include "utils/datetime.h"
@@ -1795,6 +1797,14 @@ PerformWalRecovery(void)
 
 		RmgrCleanup();
 
+		/* cleanup garbage files left during crash recovery */
+		if (!InArchiveRecovery)
+			ResetUnloggedRelations(UNLOGGED_RELATION_DROP_BUFFER |
+								   UNLOGGED_RELATION_CLEANUP);
+
+		/* run rollback cleanup if any */
+		smgrDoPendingDeletes(false);
+
 		ereport(LOG,
 				(errmsg("redo done at %X/%X system usage: %s",
 						LSN_FORMAT_ARGS(xlogreader->ReadRecPtr),
@@ -3134,6 +3144,14 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode,
 			{
 				ereport(DEBUG1,
 						(errmsg_internal("reached end of WAL in pg_wal, entering archive recovery")));
+
+				/* cleanup garbage files left during crash recovery */
+				ResetUnloggedRelations(UNLOGGED_RELATION_DROP_BUFFER |
+									   UNLOGGED_RELATION_CLEANUP);
+
+				/* run rollback cleanup if any */
+				smgrDoPendingDeletes(false);
+
 				InArchiveRecovery = true;
 				if (StandbyModeRequested)
 					EnableStandbyMode();
diff --git a/src/backend/backup/basebackup.c b/src/backend/backup/basebackup.c
index 6efdefb591..3098977626 100644
--- a/src/backend/backup/basebackup.c
+++ b/src/backend/backup/basebackup.c
@@ -1191,6 +1191,7 @@ sendDir(bbsink *sink, const char *path, int basepathlen, bool sizeonly,
 		ForkNumber	relForkNum; /* Type of fork if file is a relation */
 		int			relnumchars;	/* Chars in filename that are the
 									 * relnumber */
+		StorageMarks mark;		/* marker file sign */
 
 		/* Skip special stuff */
 		if (strcmp(de->d_name, ".") == 0 || strcmp(de->d_name, "..") == 0)
@@ -1241,7 +1242,7 @@ sendDir(bbsink *sink, const char *path, int basepathlen, bool sizeonly,
 		/* Exclude all forks for unlogged tables except the init fork */
 		if (isDbDir &&
 			parse_filename_for_nontemp_relation(de->d_name, &relnumchars,
-												&relForkNum))
+												&relForkNum, &mark))
 		{
 			/* Never exclude init forks */
 			if (relForkNum != INIT_FORKNUM)
@@ -1448,6 +1449,7 @@ is_checksummed_file(const char *fullpath, const char *filename)
 		strncmp(fullpath, "/", 1) == 0)
 	{
 		int			excludeIdx;
+		char	   *p;
 
 		/* Compare file against noChecksumFiles skip list */
 		for (excludeIdx = 0; noChecksumFiles[excludeIdx].name != NULL; excludeIdx++)
@@ -1461,6 +1463,11 @@ is_checksummed_file(const char *fullpath, const char *filename)
 				return false;
 		}
 
+		/* exclude mark files */
+		p = strchr(filename, '.');
+		if (p && isalpha(p[1]) && p[2] == 0)
+			return false;
+
 		return true;
 	}
 	else
diff --git a/src/backend/catalog/storage.c b/src/backend/catalog/storage.c
index af1491aa1d..03e06246be 100644
--- a/src/backend/catalog/storage.c
+++ b/src/backend/catalog/storage.c
@@ -19,6 +19,7 @@
 
 #include "postgres.h"
 
+#include "access/amapi.h"
 #include "access/parallel.h"
 #include "access/visibilitymap.h"
 #include "access/xact.h"
@@ -66,6 +67,21 @@ typedef struct PendingRelDelete
 	struct PendingRelDelete *next;	/* linked-list link */
 } PendingRelDelete;
 
+#define	PCOP_UNLINK_FORK		(1 << 0)
+#define	PCOP_UNLINK_MARK		(1 << 1)
+
+typedef struct PendingCleanup
+{
+	RelFileLocator rlocator;	/* relation that need a cleanup */
+	int			op;				/* operation mask */
+	ForkNumber	unlink_forknum; /* forknum to unlink */
+	StorageMarks unlink_mark;	/* mark to unlink */
+	BackendId	backend;		/* InvalidBackendId if not a temp rel */
+	bool		atCommit;		/* T=delete at commit; F=delete at abort */
+	int			nestLevel;		/* xact nesting level of request */
+	struct PendingCleanup *next;	/* linked-list link */
+}			PendingCleanup;
+
 typedef struct PendingRelSync
 {
 	RelFileLocator rlocator;
@@ -73,6 +89,7 @@ typedef struct PendingRelSync
 } PendingRelSync;
 
 static PendingRelDelete *pendingDeletes = NULL; /* head of linked list */
+static PendingCleanup * pendingCleanups = NULL; /* head of linked list */
 static HTAB *pendingSyncHash = NULL;
 
 
@@ -123,6 +140,7 @@ RelationCreateStorage(RelFileLocator rlocator, char relpersistence,
 	SMgrRelation srel;
 	BackendId	backend;
 	bool		needs_wal;
+	PendingCleanup *pendingclean;
 
 	Assert(!IsInParallelMode());	/* couldn't update pendingSyncHash */
 
@@ -145,9 +163,23 @@ RelationCreateStorage(RelFileLocator rlocator, char relpersistence,
 			return NULL;		/* placate compiler */
 	}
 
+	/*
+	 * We are going to create a new storage file. If server crashes before the
+	 * current transaction ends the file needs to be cleaned up. The
+	 * SMGR_MARK_UNCOMMITED mark file prompts that work at the next startup.
+	 * We don't need this during WAL-loggged CREATE DATABASE. See
+	 * CreateAndCopyRelationData for detail.
+	 */
 	srel = smgropen(rlocator, backend);
+
+	if (register_delete)
+	{
+		log_smgrcreatemark(&rlocator, MAIN_FORKNUM, SMGR_MARK_UNCOMMITTED);
+		smgrcreatemark(srel, MAIN_FORKNUM, SMGR_MARK_UNCOMMITTED, false);
+	}
+
 	smgrcreate(srel, MAIN_FORKNUM, false);
-
+	
 	if (needs_wal)
 		log_smgrcreate(&srel->smgr_rlocator.locator, MAIN_FORKNUM);
 
@@ -157,16 +189,29 @@ RelationCreateStorage(RelFileLocator rlocator, char relpersistence,
 	 */
 	if (register_delete)
 	{
-		PendingRelDelete *pending;
+		PendingRelDelete *pendingdel;
 
-		pending = (PendingRelDelete *)
+		pendingdel = (PendingRelDelete *)
 			MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
-		pending->rlocator = rlocator;
-		pending->backend = backend;
-		pending->atCommit = false;	/* delete if abort */
-		pending->nestLevel = GetCurrentTransactionNestLevel();
-		pending->next = pendingDeletes;
-		pendingDeletes = pending;
+		pendingdel->rlocator = rlocator;
+		pendingdel->backend = backend;
+		pendingdel->atCommit = false;	/* delete if abort */
+		pendingdel->nestLevel = GetCurrentTransactionNestLevel();
+		pendingdel->next = pendingDeletes;
+		pendingDeletes = pendingdel;
+
+		/* drop mark files at commit */
+		pendingclean = (PendingCleanup *)
+			MemoryContextAlloc(TopMemoryContext, sizeof(PendingCleanup));
+		pendingclean->rlocator = rlocator;
+		pendingclean->op = PCOP_UNLINK_MARK;
+		pendingclean->unlink_forknum = MAIN_FORKNUM;
+		pendingclean->unlink_mark = SMGR_MARK_UNCOMMITTED;
+		pendingclean->backend = backend;
+		pendingclean->atCommit = true;
+		pendingclean->nestLevel = GetCurrentTransactionNestLevel();
+		pendingclean->next = pendingCleanups;
+		pendingCleanups = pendingclean;
 	}
 
 	if (relpersistence == RELPERSISTENCE_PERMANENT && !XLogIsNeeded())
@@ -197,6 +242,69 @@ log_smgrcreate(const RelFileLocator *rlocator, ForkNumber forkNum)
 	XLogInsert(RM_SMGR_ID, XLOG_SMGR_CREATE | XLR_SPECIAL_REL_UPDATE);
 }
 
+/*
+ * Perform XLogInsert of an XLOG_SMGR_UNLINK record to WAL.
+ */
+void
+log_smgrunlink(const RelFileLocator *rlocator, ForkNumber forkNum)
+{
+	xl_smgr_unlink xlrec;
+
+	/*
+	 * Make an XLOG entry reporting the file unlink.
+	 */
+	xlrec.rlocator = *rlocator;
+	xlrec.forkNum = forkNum;
+
+	XLogBeginInsert();
+	XLogRegisterData((char *) &xlrec, sizeof(xlrec));
+	XLogInsert(RM_SMGR_ID, XLOG_SMGR_UNLINK | XLR_SPECIAL_REL_UPDATE);
+}
+
+/*
+ * Perform XLogInsert of an XLOG_SMGR_CREATEMARK record to WAL.
+ */
+void
+log_smgrcreatemark(const RelFileLocator *rlocator, ForkNumber forkNum,
+				   StorageMarks mark)
+{
+	xl_smgr_mark xlrec;
+
+	/*
+	 * Make an XLOG entry reporting the file creation.
+	 */
+	xlrec.rlocator = *rlocator;
+	xlrec.forkNum = forkNum;
+	xlrec.mark = mark;
+	xlrec.action = XLOG_SMGR_MARK_CREATE;
+
+	XLogBeginInsert();
+	XLogRegisterData((char *) &xlrec, sizeof(xlrec));
+	XLogInsert(RM_SMGR_ID, XLOG_SMGR_MARK | XLR_SPECIAL_REL_UPDATE);
+}
+
+/*
+ * Perform XLogInsert of an XLOG_SMGR_UNLINKMARK record to WAL.
+ */
+void
+log_smgrunlinkmark(const RelFileLocator *rlocator, ForkNumber forkNum,
+				   StorageMarks mark)
+{
+	xl_smgr_mark xlrec;
+
+	/*
+	 * Make an XLOG entry reporting the file creation.
+	 */
+	xlrec.rlocator = *rlocator;
+	xlrec.forkNum = forkNum;
+	xlrec.mark = mark;
+	xlrec.action = XLOG_SMGR_MARK_UNLINK;
+
+	XLogBeginInsert();
+	XLogRegisterData((char *) &xlrec, sizeof(xlrec));
+	XLogInsert(RM_SMGR_ID, XLOG_SMGR_MARK | XLR_SPECIAL_REL_UPDATE);
+}
+
 /*
  * RelationDropStorage
  *		Schedule unlinking of physical storage at transaction commit.
@@ -711,6 +819,76 @@ smgrDoPendingDeletes(bool isCommit)
 	}
 }
 
+/*
+ *	smgrDoPendingUnmark() -- Clean up work that emits WAL records
+ *
+ *  The operations handled in the function emits WAL records, which must be
+ *  part of the current transaction.
+ */
+void
+smgrDoPendingCleanups(bool isCommit)
+{
+	int			nestLevel = GetCurrentTransactionNestLevel();
+	PendingCleanup *pending;
+	PendingCleanup *prev;
+	PendingCleanup *next;
+
+	prev = NULL;
+	for (pending = pendingCleanups; pending != NULL; pending = next)
+	{
+		next = pending->next;
+		if (pending->nestLevel < nestLevel)
+		{
+			/* outer-level entries should not be processed yet */
+			prev = pending;
+		}
+		else
+		{
+			/* unlink list entry first, so we don't retry on failure */
+			if (prev)
+				prev->next = next;
+			else
+				pendingCleanups = next;
+
+			/* do cleanup if called for */
+			if (pending->atCommit == isCommit)
+			{
+				SMgrRelation srel;
+
+				srel = smgropen(pending->rlocator, pending->backend);
+
+				Assert((pending->op &
+						~(PCOP_UNLINK_FORK | PCOP_UNLINK_MARK)) == 0);
+
+				if (pending->op & PCOP_UNLINK_FORK)
+				{
+					/* Don't emit wal while recovery. */
+					if (!InRecovery)
+						log_smgrunlink(&pending->rlocator,
+									   pending->unlink_forknum);
+					smgrunlink(srel, pending->unlink_forknum, false);
+				}
+
+				if (pending->op & PCOP_UNLINK_MARK)
+				{
+					if (!InRecovery)
+						log_smgrunlinkmark(&pending->rlocator,
+										   pending->unlink_forknum,
+										   pending->unlink_mark);
+
+					smgrunlinkmark(srel, pending->unlink_forknum,
+								   pending->unlink_mark, InRecovery);
+					smgrclose(srel);
+				}
+			}
+
+			/* must explicitly free the list entry */
+			pfree(pending);
+			/* prev does not change */
+		}
+	}
+}
+
 /*
  *	smgrDoPendingSyncs() -- Take care of relation syncs at end of xact.
  */
@@ -971,6 +1149,15 @@ smgr_redo(XLogReaderState *record)
 		reln = smgropen(xlrec->rlocator, InvalidBackendId);
 		smgrcreate(reln, xlrec->forkNum, true);
 	}
+	else if (info == XLOG_SMGR_UNLINK)
+	{
+		xl_smgr_unlink *xlrec = (xl_smgr_unlink *) XLogRecGetData(record);
+		SMgrRelation reln;
+
+		reln = smgropen(xlrec->rlocator, InvalidBackendId);
+		smgrunlink(reln, xlrec->forkNum, true);
+		smgrclose(reln);
+	}
 	else if (info == XLOG_SMGR_TRUNCATE)
 	{
 		xl_smgr_truncate *xlrec = (xl_smgr_truncate *) XLogRecGetData(record);
@@ -1059,6 +1246,71 @@ smgr_redo(XLogReaderState *record)
 
 		FreeFakeRelcacheEntry(rel);
 	}
+	else if (info == XLOG_SMGR_MARK)
+	{
+		xl_smgr_mark *xlrec = (xl_smgr_mark *) XLogRecGetData(record);
+		SMgrRelation reln;
+		PendingCleanup *pending;
+		bool		created = false;
+
+		reln = smgropen(xlrec->rlocator, InvalidBackendId);
+
+		switch (xlrec->action)
+		{
+			case XLOG_SMGR_MARK_CREATE:
+				smgrcreatemark(reln, xlrec->forkNum, xlrec->mark, true);
+				created = true;
+				break;
+			case XLOG_SMGR_MARK_UNLINK:
+				smgrunlinkmark(reln, xlrec->forkNum, xlrec->mark, true);
+				break;
+			default:
+				elog(ERROR, "unknown smgr_mark action \"%c\"", xlrec->mark);
+		}
+
+		if (created)
+		{
+			/* revert mark file operation at abort */
+			pending = (PendingCleanup *)
+				MemoryContextAlloc(TopMemoryContext, sizeof(PendingCleanup));
+			pending->rlocator = xlrec->rlocator;
+			pending->op = PCOP_UNLINK_MARK;
+			pending->unlink_forknum = xlrec->forkNum;
+			pending->unlink_mark = xlrec->mark;
+			pending->backend = InvalidBackendId;
+			pending->atCommit = false;
+			pending->nestLevel = GetCurrentTransactionNestLevel();
+			pending->next = pendingCleanups;
+			pendingCleanups = pending;
+		}
+		else
+		{
+			/*
+			 * Delete pending action for this mark file if any. We should have
+			 * at most one entry for this action.
+			 */
+			PendingCleanup *prev = NULL;
+
+			for (pending = pendingCleanups; pending != NULL;
+				 pending = pending->next)
+			{
+				if (RelFileLocatorEquals(xlrec->rlocator, pending->rlocator) &&
+					pending->unlink_forknum == xlrec->forkNum &&
+					(pending->op & PCOP_UNLINK_MARK) != 0)
+				{
+					if (prev)
+						prev->next = pending->next;
+					else
+						pendingCleanups = pending->next;
+
+					pfree(pending);
+					break;
+				}
+
+				prev = pending;
+			}
+		}
+	}
 	else
 		elog(PANIC, "smgr_redo: unknown op code %u", info);
 }
diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c
index 9fd8444ed4..1b77347978 100644
--- a/src/backend/storage/file/fd.c
+++ b/src/backend/storage/file/fd.c
@@ -346,8 +346,6 @@ static void pre_sync_fname(const char *fname, bool isdir, int elevel);
 static void datadir_fsync_fname(const char *fname, bool isdir, int elevel);
 static void unlink_if_exists_fname(const char *fname, bool isdir, int elevel);
 
-static int	fsync_parent_path(const char *fname, int elevel);
-
 
 /*
  * pg_fsync --- do fsync with or without writethrough
@@ -3670,7 +3668,7 @@ fsync_fname_ext(const char *fname, bool isdir, bool ignore_perm, int elevel)
  * This is aimed at making file operations persistent on disk in case of
  * an OS crash or power failure.
  */
-static int
+int
 fsync_parent_path(const char *fname, int elevel)
 {
 	char		parentpath[MAXPGPATH];
diff --git a/src/backend/storage/file/reinit.c b/src/backend/storage/file/reinit.c
index fb55371b1b..250cfe9e44 100644
--- a/src/backend/storage/file/reinit.c
+++ b/src/backend/storage/file/reinit.c
@@ -16,29 +16,45 @@
 
 #include <unistd.h>
 
+#include "access/xlogrecovery.h"
+#include "catalog/pg_tablespace_d.h"
 #include "common/relpath.h"
 #include "postmaster/startup.h"
+#include "storage/bufmgr.h"
 #include "storage/copydir.h"
 #include "storage/fd.h"
+#include "storage/md.h"
 #include "storage/reinit.h"
+#include "storage/smgr.h"
 #include "utils/hsearch.h"
 #include "utils/memutils.h"
 
 static void ResetUnloggedRelationsInTablespaceDir(const char *tsdirname,
-												  int op);
+												  Oid tspid, int op);
 static void ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname,
-											   int op);
+											   Oid tspid, Oid dbid, int op);
 
 typedef struct
 {
-	Oid			reloid;			/* hash key */
-} unlogged_relation_entry;
+ 	RelFileNumber	relNumber;		/* hash key */
+	bool			has_init;		/* has INIT fork */
+	bool			dirty_all;		/* needs to remove all forks */
+}  relfile_entry;
 
 /*
- * Reset unlogged relations from before the last restart.
+ * Clean up and reset relation files from before the last restart.
  *
- * If op includes UNLOGGED_RELATION_CLEANUP, we remove all forks of any
- * relation with an "init" fork, except for the "init" fork itself.
+ * If op includes UNLOGGED_RELATION_CLEANUP, we perform different operations
+ * depending on the existence of mark files.
+ *
+ * If SMGR_MARK_UNCOMMITTED mark file for main fork is present we remove the
+ * whole relation along with the mark file.
+ *
+ * Otherwise, if the "init" fork is found.  we remove all forks of any relation
+ * with the "init" fork, except for the "init" fork itself.
+ *
+ * If op includes UNLOGGED_RELATION_DROP_BUFFER, we drop all buffers for all
+ * relations that are to be cleaned up.
  *
  * If op includes UNLOGGED_RELATION_INIT, we copy the "init" fork to the main
  * fork.
@@ -72,7 +88,7 @@ ResetUnloggedRelations(int op)
 	/*
 	 * First process unlogged files in pg_default ($PGDATA/base)
 	 */
-	ResetUnloggedRelationsInTablespaceDir("base", op);
+	ResetUnloggedRelationsInTablespaceDir("base", DEFAULTTABLESPACE_OID, op);
 
 	/*
 	 * Cycle through directories for all non-default tablespaces.
@@ -81,13 +97,19 @@ ResetUnloggedRelations(int op)
 
 	while ((spc_de = ReadDir(spc_dir, "pg_tblspc")) != NULL)
 	{
+		Oid			tspid;
+
 		if (strcmp(spc_de->d_name, ".") == 0 ||
 			strcmp(spc_de->d_name, "..") == 0)
 			continue;
 
 		snprintf(temp_path, sizeof(temp_path), "pg_tblspc/%s/%s",
 				 spc_de->d_name, TABLESPACE_VERSION_DIRECTORY);
-		ResetUnloggedRelationsInTablespaceDir(temp_path, op);
+
+		tspid = atooid(spc_de->d_name);
+
+		Assert(tspid != 0);
+		ResetUnloggedRelationsInTablespaceDir(temp_path, tspid, op);
 	}
 
 	FreeDir(spc_dir);
@@ -103,7 +125,8 @@ ResetUnloggedRelations(int op)
  * Process one tablespace directory for ResetUnloggedRelations
  */
 static void
-ResetUnloggedRelationsInTablespaceDir(const char *tsdirname, int op)
+ResetUnloggedRelationsInTablespaceDir(const char *tsdirname,
+									  Oid tspid, int op)
 {
 	DIR		   *ts_dir;
 	struct dirent *de;
@@ -130,6 +153,8 @@ ResetUnloggedRelationsInTablespaceDir(const char *tsdirname, int op)
 
 	while ((de = ReadDir(ts_dir, tsdirname)) != NULL)
 	{
+		Oid			dbid;
+
 		/*
 		 * We're only interested in the per-database directories, which have
 		 * numeric names.  Note that this code will also (properly) ignore "."
@@ -148,7 +173,10 @@ ResetUnloggedRelationsInTablespaceDir(const char *tsdirname, int op)
 			ereport_startup_progress("resetting unlogged relations (cleanup), elapsed time: %ld.%02d s, current path: %s",
 									 dbspace_path);
 
-		ResetUnloggedRelationsInDbspaceDir(dbspace_path, op);
+		dbid = atooid(de->d_name);
+		Assert(dbid != 0);
+
+		ResetUnloggedRelationsInDbspaceDir(dbspace_path, tspid, dbid, op);
 	}
 
 	FreeDir(ts_dir);
@@ -158,125 +186,200 @@ ResetUnloggedRelationsInTablespaceDir(const char *tsdirname, int op)
  * Process one per-dbspace directory for ResetUnloggedRelations
  */
 static void
-ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname, int op)
+ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname,
+								   Oid tspid, Oid dbid, int op)
 {
 	DIR		   *dbspace_dir;
 	struct dirent *de;
 	char		rm_path[MAXPGPATH * 2];
+	HTAB	   *hash;
+	HASHCTL		ctl;
 
 	/* Caller must specify at least one operation. */
-	Assert((op & (UNLOGGED_RELATION_CLEANUP | UNLOGGED_RELATION_INIT)) != 0);
+	Assert((op & (UNLOGGED_RELATION_CLEANUP |
+				  UNLOGGED_RELATION_DROP_BUFFER |
+				  UNLOGGED_RELATION_INIT)) != 0);
 
 	/*
 	 * Cleanup is a two-pass operation.  First, we go through and identify all
 	 * the files with init forks.  Then, we go through again and nuke
 	 * everything with the same OID except the init fork.
 	 */
+
+	/*
+	 * It's possible that someone could create tons of unlogged relations in
+	 * the same database & tablespace, so we'd better use a hash table rather
+	 * than an array or linked list to keep track of which files need to be
+	 * reset.  Otherwise, this cleanup operation would be O(n^2).
+	 */
+	memset(&ctl, 0, sizeof(ctl));
+	ctl.keysize = sizeof(RelFileNumber);
+	ctl.entrysize = sizeof(relfile_entry);
+	hash = hash_create("unlogged relation RelFileNumbers",
+					   32, &ctl, HASH_ELEM | HASH_BLOBS);
+
+	/* Collect INIT fork and mark files in the directory. */
+	dbspace_dir = AllocateDir(dbspacedirname);
+	while ((de = ReadDir(dbspace_dir, dbspacedirname)) != NULL)
+	{
+		ForkNumber	forkNum;
+		int			relnumchars;
+		StorageMarks mark;
+
+		/* Skip anything that doesn't look like a relation data file. */
+		if (!parse_filename_for_nontemp_relation(de->d_name, &relnumchars,
+												 &forkNum, &mark))
+			continue;
+
+		if (forkNum == INIT_FORKNUM || mark == SMGR_MARK_UNCOMMITTED)
+		{
+			RelFileNumber key;
+			relfile_entry *ent;
+			bool		found;
+
+			/*
+			 * Put the OID portion of the name into the hash table,
+			 * if it isn't already.  If it has SMGR_MARK_UNCOMMITTED mark
+			 * files, the storage file is in dirty state, where clean up is
+			 * needed.
+			 */
+			key = atooid(de->d_name);
+			ent = hash_search(hash, &key, HASH_ENTER, &found);
+
+			if (!found)
+			{
+				ent->has_init = false;
+				ent->dirty_all = false;
+			}
+
+			if (forkNum == MAIN_FORKNUM && mark == SMGR_MARK_UNCOMMITTED)
+				ent->dirty_all = true;
+			else
+			{
+				Assert(forkNum == INIT_FORKNUM);
+				ent->has_init = true;
+			}
+		}
+	}
+
+	/* Done with the first pass. */
+	FreeDir(dbspace_dir);
+
+	/* nothing to do if we don't have init nor cleanup forks */
+	if (hash_get_num_entries(hash) < 1)
+	{
+		hash_destroy(hash);
+		return;
+	}
+
+	if ((op & UNLOGGED_RELATION_DROP_BUFFER) != 0)
+	{
+		/*
+		 * When we come here after recovery, smgr object for this file might
+		 * have been created. In that case we need to drop all buffers then the
+		 * smgr object.  Otherwise checkpointer wrongly tries to flush buffers
+		 * for nonexistent relation storage. This is safe as far as no other
+		 * backends have accessed the relation before starting archive
+		 * recovery.
+		 */
+		HASH_SEQ_STATUS status;
+		relfile_entry *ent;
+		SMgrRelation *srels = palloc(sizeof(SMgrRelation) * 8);
+		int			maxrels = 8;
+		int			nrels = 0;
+		int			i;
+
+		Assert(!HotStandbyActive());
+
+		hash_seq_init(&status, hash);
+		while ((ent = (relfile_entry *) hash_seq_search(&status)) != NULL)
+		{
+			RelFileLocatorBackend rel;
+
+			if (maxrels <= nrels)
+			{
+				maxrels *= 2;
+				srels = repalloc(srels, sizeof(SMgrRelation) * maxrels);
+			}
+
+			rel.backend = InvalidBackendId;
+			rel.locator.spcOid = tspid;
+			rel.locator.dbOid = dbid;
+			rel.locator.relNumber = ent->relNumber;
+
+			srels[nrels++] = smgropen(rel.locator, InvalidBackendId);
+		}
+
+		DropRelationsAllBuffers(srels, nrels);
+
+		for (i = 0; i < nrels; i++)
+			smgrclose(srels[i]);
+	}
+
+	/*
+	 * Now, make a second pass and remove anything that matches.
+	 */
 	if ((op & UNLOGGED_RELATION_CLEANUP) != 0)
 	{
-		HTAB	   *hash;
-		HASHCTL		ctl;
-
-		/*
-		 * It's possible that someone could create a ton of unlogged relations
-		 * in the same database & tablespace, so we'd better use a hash table
-		 * rather than an array or linked list to keep track of which files
-		 * need to be reset.  Otherwise, this cleanup operation would be
-		 * O(n^2).
-		 */
-		ctl.keysize = sizeof(Oid);
-		ctl.entrysize = sizeof(unlogged_relation_entry);
-		ctl.hcxt = CurrentMemoryContext;
-		hash = hash_create("unlogged relation OIDs", 32, &ctl,
-						   HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
-
-		/* Scan the directory. */
 		dbspace_dir = AllocateDir(dbspacedirname);
 		while ((de = ReadDir(dbspace_dir, dbspacedirname)) != NULL)
 		{
 			ForkNumber	forkNum;
+			StorageMarks mark;
 			int			relnumchars;
-			unlogged_relation_entry ent;
+			RelFileNumber key;
+			relfile_entry *ent;
+			RelFileLocatorBackend rel;
 
 			/* Skip anything that doesn't look like a relation data file. */
 			if (!parse_filename_for_nontemp_relation(de->d_name, &relnumchars,
-													 &forkNum))
-				continue;
-
-			/* Also skip it unless this is the init fork. */
-			if (forkNum != INIT_FORKNUM)
-				continue;
-
-			/*
-			 * Put the OID portion of the name into the hash table, if it
-			 * isn't already.
-			 */
-			ent.reloid = atooid(de->d_name);
-			(void) hash_search(hash, &ent, HASH_ENTER, NULL);
-		}
-
-		/* Done with the first pass. */
-		FreeDir(dbspace_dir);
-
-		/*
-		 * If we didn't find any init forks, there's no point in continuing;
-		 * we can bail out now.
-		 */
-		if (hash_get_num_entries(hash) == 0)
-		{
-			hash_destroy(hash);
-			return;
-		}
-
-		/*
-		 * Now, make a second pass and remove anything that matches.
-		 */
-		dbspace_dir = AllocateDir(dbspacedirname);
-		while ((de = ReadDir(dbspace_dir, dbspacedirname)) != NULL)
-		{
-			ForkNumber	forkNum;
-			int			relnumchars;
-			unlogged_relation_entry ent;
-
-			/* Skip anything that doesn't look like a relation data file. */
-			if (!parse_filename_for_nontemp_relation(de->d_name, &relnumchars,
-													 &forkNum))
-				continue;
-
-			/* We never remove the init fork. */
-			if (forkNum == INIT_FORKNUM)
+													 &forkNum, &mark))
 				continue;
 
 			/*
 			 * See whether the OID portion of the name shows up in the hash
 			 * table.  If so, nuke it!
 			 */
-			ent.reloid = atooid(de->d_name);
-			if (hash_search(hash, &ent, HASH_FIND, NULL))
+			key = atooid(de->d_name);
+			ent = hash_search(hash, &key, HASH_FIND, NULL);
+
+			if (!ent)
+				continue;
+
+			if (!ent->dirty_all)
 			{
-				snprintf(rm_path, sizeof(rm_path), "%s/%s",
-						 dbspacedirname, de->d_name);
-				if (unlink(rm_path) < 0)
-					ereport(ERROR,
-							(errcode_for_file_access(),
-							 errmsg("could not remove file \"%s\": %m",
-									rm_path)));
-				else
-					elog(DEBUG2, "unlinked file \"%s\"", rm_path);
+				/* clean permanent relations don't need cleanup */
+				if (!ent->has_init)
+					continue;
+
+				if (forkNum == INIT_FORKNUM && mark == SMGR_MARK_NONE)
+					continue;
 			}
+
+			/* so, nuke it! */
+			snprintf(rm_path, sizeof(rm_path), "%s/%s",
+					 dbspacedirname, de->d_name);
+			if (unlink(rm_path) < 0)
+				ereport(ERROR,
+						errcode_for_file_access(),
+						errmsg("could not remove file \"%s\": %m",
+							   rm_path));
+
+			rel.backend = InvalidBackendId;
+			rel.locator.spcOid = tspid;
+			rel.locator.dbOid = dbid;
+			rel.locator.relNumber = atooid(de->d_name);
+
+			ForgetRelationForkSyncRequests(rel, forkNum);
 		}
 
 		/* Cleanup is complete. */
 		FreeDir(dbspace_dir);
-		hash_destroy(hash);
 	}
 
 	/*
 	 * Initialization happens after cleanup is complete: we copy each init
-	 * fork file to the corresponding main fork file.  Note that if we are
-	 * asked to do both cleanup and init, we may never get here: if the
-	 * cleanup code determines that there are no init forks in this dbspace,
-	 * it will return before we get to this point.
+	 * fork file to the corresponding main fork file.
 	 */
 	if ((op & UNLOGGED_RELATION_INIT) != 0)
 	{
@@ -285,6 +388,7 @@ ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname, int op)
 		while ((de = ReadDir(dbspace_dir, dbspacedirname)) != NULL)
 		{
 			ForkNumber	forkNum;
+			StorageMarks mark;
 			int			relnumchars;
 			char		relnumbuf[OIDCHARS + 1];
 			char		srcpath[MAXPGPATH * 2];
@@ -292,9 +396,11 @@ ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname, int op)
 
 			/* Skip anything that doesn't look like a relation data file. */
 			if (!parse_filename_for_nontemp_relation(de->d_name, &relnumchars,
-													 &forkNum))
+													 &forkNum, &mark))
 				continue;
 
+			Assert(mark == SMGR_MARK_NONE);
+
 			/* Also skip it unless this is the init fork. */
 			if (forkNum != INIT_FORKNUM)
 				continue;
@@ -328,15 +434,18 @@ ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname, int op)
 		while ((de = ReadDir(dbspace_dir, dbspacedirname)) != NULL)
 		{
 			ForkNumber	forkNum;
+			StorageMarks mark;
 			int			relnumchars;
 			char		relnumbuf[OIDCHARS + 1];
 			char		mainpath[MAXPGPATH];
 
 			/* Skip anything that doesn't look like a relation data file. */
 			if (!parse_filename_for_nontemp_relation(de->d_name, &relnumchars,
-													 &forkNum))
+													 &forkNum, &mark))
 				continue;
 
+			Assert(mark == SMGR_MARK_NONE);
+
 			/* Also skip it unless this is the init fork. */
 			if (forkNum != INIT_FORKNUM)
 				continue;
@@ -379,7 +488,7 @@ ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname, int op)
  */
 bool
 parse_filename_for_nontemp_relation(const char *name, int *relnumchars,
-									ForkNumber *fork)
+									ForkNumber *fork, StorageMarks *mark)
 {
 	int			pos;
 
@@ -410,11 +519,19 @@ parse_filename_for_nontemp_relation(const char *name, int *relnumchars,
 
 		for (segchar = 1; isdigit((unsigned char) name[pos + segchar]); ++segchar)
 			;
-		if (segchar <= 1)
-			return false;
-		pos += segchar;
+		if (segchar > 1)
+			pos += segchar;
 	}
 
+	/* mark file? */
+	if (name[pos] == '.' && name[pos + 1] != 0)
+	{
+		*mark = name[pos + 1];
+		pos += 2;
+	}
+	else
+		*mark = SMGR_MARK_NONE;
+
 	/* Now we should be at the end. */
 	if (name[pos] != '\0')
 		return false;
diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c
index 352958e1fe..0b64635fb8 100644
--- a/src/backend/storage/smgr/md.c
+++ b/src/backend/storage/smgr/md.c
@@ -141,7 +141,8 @@ static MdfdVec *_mdfd_getseg(SMgrRelation reln, ForkNumber forknum,
 							 BlockNumber blkno, bool skipFsync, int behavior);
 static BlockNumber _mdnblocks(SMgrRelation reln, ForkNumber forknum,
 							  MdfdVec *seg);
-
+static bool mdmarkexists(SMgrRelation reln, ForkNumber forkNum,
+						 StorageMarks mark);
 
 /*
  *	mdinit() -- Initialize private state for magnetic disk storage manager.
@@ -173,6 +174,82 @@ mdexists(SMgrRelation reln, ForkNumber forknum)
 	return (mdopenfork(reln, forknum, EXTENSION_RETURN_NULL) != NULL);
 }
 
+/*
+ *  mdcreatemark() -- Create a mark file.
+ *
+ * If isRedo is true, it's okay for the file to exist already.
+ */
+void
+mdcreatemark(SMgrRelation reln, ForkNumber forkNum, StorageMarks mark,
+			 bool isRedo)
+{
+	char	   *path = markpath(reln->smgr_rlocator, forkNum, mark);
+	int			fd;
+
+	/* See mdcreate for details.. */
+	TablespaceCreateDbspace(reln->smgr_rlocator.locator.spcOid,
+							reln->smgr_rlocator.locator.dbOid,
+							isRedo);
+
+	fd = BasicOpenFile(path, O_WRONLY | O_CREAT | O_EXCL);
+	if (fd < 0 && (!isRedo || errno != EEXIST))
+		ereport(ERROR,
+				errcode_for_file_access(),
+				errmsg("could not create mark file \"%s\": %m", path));
+
+	pg_fsync(fd);
+	close(fd);
+
+	/*
+	 * To guarantee that the creation of the file is persistent, fsync its
+	 * parent directory.
+	 */
+	fsync_parent_path(path, ERROR);
+
+	pfree(path);
+}
+
+
+/*
+ *  mdunlinkmark()  -- Delete the mark file
+ *
+ * If isRedo is true, it's okay for the file being not found.
+ */
+void
+mdunlinkmark(SMgrRelation reln, ForkNumber forkNum, StorageMarks mark,
+			 bool isRedo)
+{
+	char	   *path = markpath(reln->smgr_rlocator, forkNum, mark);
+
+	if (!isRedo || mdmarkexists(reln, forkNum, mark))
+		durable_unlink(path, ERROR);
+
+	pfree(path);
+}
+
+/*
+ *  mdmarkexists()  -- Check if the file exists.
+ */
+static bool
+mdmarkexists(SMgrRelation reln, ForkNumber forkNum, StorageMarks mark)
+{
+	char	   *path = markpath(reln->smgr_rlocator, forkNum, mark);
+	int			fd;
+
+	fd = BasicOpenFile(path, O_RDONLY);
+	if (fd < 0 && errno != ENOENT)
+		ereport(ERROR,
+				errcode_for_file_access(),
+				errmsg("could not access mark file \"%s\": %m", path));
+	pfree(path);
+
+	if (fd < 0)
+		return false;
+
+	close(fd);
+	return true;
+}
+
 /*
  *	mdcreate() -- Create a new relation on magnetic disk.
  *
@@ -1085,6 +1162,16 @@ register_forget_request(RelFileLocatorBackend rlocator, ForkNumber forknum,
 	RegisterSyncRequest(&tag, SYNC_FORGET_REQUEST, true /* retryOnError */ );
 }
 
+/*
+ * ForgetRelationForkSyncRequests -- forget any fsyncs and unlinks for a fork
+ */
+void
+ForgetRelationForkSyncRequests(RelFileLocatorBackend rlocator,
+							   ForkNumber forknum)
+{
+	register_forget_request(rlocator, forknum, 0);
+}
+
 /*
  * ForgetDatabaseSyncRequests -- forget any fsyncs and unlinks for a DB
  */
@@ -1445,12 +1532,14 @@ mdsyncfiletag(const FileTag *ftag, char *path)
  * Return 0 on success, -1 on failure, with errno set.
  */
 int
-mdunlinkfiletag(const FileTag *ftag, char *path)
+mdunlinkfiletag(const FileTag *ftag, char *path, StorageMarks mark)
 {
 	char	   *p;
 
 	/* Compute the path. */
-	p = relpathperm(ftag->rlocator, MAIN_FORKNUM);
+	p = GetRelationPath(ftag->rlocator.dbOid, ftag->rlocator.spcOid,
+						ftag->rlocator.relNumber,InvalidBackendId,
+						MAIN_FORKNUM, mark);
 	strlcpy(path, p, MAXPGPATH);
 	pfree(p);
 
diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c
index dc466e5414..9969d84209 100644
--- a/src/backend/storage/smgr/smgr.c
+++ b/src/backend/storage/smgr/smgr.c
@@ -62,6 +62,10 @@ typedef struct f_smgr
 	void		(*smgr_truncate) (SMgrRelation reln, ForkNumber forknum,
 								  BlockNumber nblocks);
 	void		(*smgr_immedsync) (SMgrRelation reln, ForkNumber forknum);
+	void		(*smgr_createmark) (SMgrRelation reln, ForkNumber forknum,
+									StorageMarks mark, bool isRedo);
+	void		(*smgr_unlinkmark) (SMgrRelation reln, ForkNumber forknum,
+									StorageMarks mark, bool isRedo);
 } f_smgr;
 
 static const f_smgr smgrsw[] = {
@@ -82,6 +86,8 @@ static const f_smgr smgrsw[] = {
 		.smgr_nblocks = mdnblocks,
 		.smgr_truncate = mdtruncate,
 		.smgr_immedsync = mdimmedsync,
+		.smgr_createmark = mdcreatemark,
+		.smgr_unlinkmark = mdunlinkmark,
 	}
 };
 
@@ -371,6 +377,26 @@ smgrcreate(SMgrRelation reln, ForkNumber forknum, bool isRedo)
 	smgrsw[reln->smgr_which].smgr_create(reln, forknum, isRedo);
 }
 
+/*
+ *	smgrcreatemark() -- Create a mark file
+ */
+void
+smgrcreatemark(SMgrRelation reln, ForkNumber forknum, StorageMarks mark,
+			   bool isRedo)
+{
+	smgrsw[reln->smgr_which].smgr_createmark(reln, forknum, mark, isRedo);
+}
+
+/*
+ *	smgrunlinkmark() -- Delete a mark file
+ */
+void
+smgrunlinkmark(SMgrRelation reln, ForkNumber forknum, StorageMarks mark,
+			   bool isRedo)
+{
+	smgrsw[reln->smgr_which].smgr_unlinkmark(reln, forknum, mark, isRedo);
+}
+
 /*
  *	smgrdosyncall() -- Immediately sync all forks of all given relations
  *
@@ -693,6 +719,12 @@ smgrimmedsync(SMgrRelation reln, ForkNumber forknum)
 	smgrsw[reln->smgr_which].smgr_immedsync(reln, forknum);
 }
 
+void
+smgrunlink(SMgrRelation reln, ForkNumber forknum, bool isRedo)
+{
+	smgrsw[reln->smgr_which].smgr_unlink(reln->smgr_rlocator, forknum, isRedo);
+}
+
 /*
  * AtEOXact_SMgr
  *
diff --git a/src/backend/storage/sync/sync.c b/src/backend/storage/sync/sync.c
index 768d1dbfc4..16cf74702e 100644
--- a/src/backend/storage/sync/sync.c
+++ b/src/backend/storage/sync/sync.c
@@ -91,7 +91,8 @@ static CycleCtr checkpoint_cycle_ctr = 0;
 typedef struct SyncOps
 {
 	int			(*sync_syncfiletag) (const FileTag *ftag, char *path);
-	int			(*sync_unlinkfiletag) (const FileTag *ftag, char *path);
+	int			(*sync_unlinkfiletag) (const FileTag *ftag, char *path,
+									   StorageMarks mark);
 	bool		(*sync_filetagmatches) (const FileTag *ftag,
 										const FileTag *candidate);
 } SyncOps;
@@ -235,7 +236,8 @@ SyncPostCheckpoint(void)
 
 		/* Unlink the file */
 		if (syncsw[entry->tag.handler].sync_unlinkfiletag(&entry->tag,
-														  path) < 0)
+														  path,
+														  SMGR_MARK_NONE) < 0)
 		{
 			/*
 			 * There's a race condition, when the database is dropped at the
@@ -244,6 +246,26 @@ SyncPostCheckpoint(void)
 			 * here. rmtree() also has to ignore ENOENT errors, to deal with
 			 * the possibility that we delete the file first.
 			 */
+			if (errno != ENOENT)
+				ereport(WARNING,
+						errcode_for_file_access(),
+						errmsg("could not remove file \"%s\": %m", path));
+		}
+		else if (syncsw[entry->tag.handler].sync_unlinkfiletag(&entry->tag,
+															   path,
+															   SMGR_MARK_UNCOMMITTED)
+				 < 0)
+		{
+			/*
+			 * We might also have SMGR_MARK_UNCOMMITTED file.  Remove it if the
+			 * fork file has been successfully removed. It's fine if the file
+			 * does not exist. Since we have successfully removed the storage
+			 * file, it's no big deal if the mark file can't be removed. It
+			 * will be eventually removed during a future startup. If that
+			 * removal fails, the leftover mark file prevents the creation of
+			 * the corresponding storage file so that mark files won't result
+			 * in unexpected removal of the correct storage files.
+			 */
 			if (errno != ENOENT)
 				ereport(WARNING,
 						(errcode_for_file_access(),
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index 27782237d0..e9e4bafb01 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -418,6 +418,22 @@ extractPageInfo(XLogReaderState *record)
 		 * source system.
 		 */
 	}
+	else if (rmid == RM_SMGR_ID && rminfo == XLOG_SMGR_UNLINK)
+	{
+		/*
+		 * We can safely ignore there.  We'll see that the file don't exist in
+		 * the target data dir, and copy them in from the source system. No
+		 * need to do anything special here.
+		 */
+	}
+	else if (rmid == RM_SMGR_ID && rminfo == XLOG_SMGR_MARK)
+	{
+		/*
+		 * We can safely ignore these, The file will be removed from the
+		 * target, if it doesn't exist in the source system.  The files are
+		 * empty so we don't need to bother the content.
+		 */
+	}
 	else if (rmid == RM_XACT_ID &&
 			 ((rminfo & XLOG_XACT_OPMASK) == XLOG_XACT_COMMIT ||
 			  (rminfo & XLOG_XACT_OPMASK) == XLOG_XACT_COMMIT_PREPARED ||
diff --git a/src/common/relpath.c b/src/common/relpath.c
index 87de5f6c96..b1f6832cfa 100644
--- a/src/common/relpath.c
+++ b/src/common/relpath.c
@@ -139,9 +139,15 @@ GetDatabasePath(Oid dbOid, Oid spcOid)
  */
 char *
 GetRelationPath(Oid dbOid, Oid spcOid, RelFileNumber relNumber,
-				int backendId, ForkNumber forkNumber)
+				int backendId, ForkNumber forkNumber, char mark)
 {
 	char	   *path;
+	char		markstr[4];
+
+	if (mark == 0)
+		markstr[0] = 0;
+	else
+		snprintf(markstr, sizeof(markstr), ".%c", mark);
 
 	if (spcOid == GLOBALTABLESPACE_OID)
 	{
@@ -149,10 +155,10 @@ GetRelationPath(Oid dbOid, Oid spcOid, RelFileNumber relNumber,
 		Assert(dbOid == 0);
 		Assert(backendId == InvalidBackendId);
 		if (forkNumber != MAIN_FORKNUM)
-			path = psprintf("global/%u_%s",
-							relNumber, forkNames[forkNumber]);
+			path = psprintf("global/%u_%s%s",
+							relNumber, forkNames[forkNumber], markstr);
 		else
-			path = psprintf("global/%u", relNumber);
+			path = psprintf("global/%u%s", relNumber, markstr);
 	}
 	else if (spcOid == DEFAULTTABLESPACE_OID)
 	{
@@ -160,22 +166,22 @@ GetRelationPath(Oid dbOid, Oid spcOid, RelFileNumber relNumber,
 		if (backendId == InvalidBackendId)
 		{
 			if (forkNumber != MAIN_FORKNUM)
-				path = psprintf("base/%u/%u_%s",
+				path = psprintf("base/%u/%u_%s%s",
 								dbOid, relNumber,
-								forkNames[forkNumber]);
+								forkNames[forkNumber], markstr);
 			else
-				path = psprintf("base/%u/%u",
-								dbOid, relNumber);
+				path = psprintf("base/%u/%u%s",
+								dbOid, relNumber, markstr);
 		}
 		else
 		{
 			if (forkNumber != MAIN_FORKNUM)
-				path = psprintf("base/%u/t%d_%u_%s",
+				path = psprintf("base/%u/t%d_%u_%s%s",
 								dbOid, backendId, relNumber,
-								forkNames[forkNumber]);
+								forkNames[forkNumber], markstr);
 			else
-				path = psprintf("base/%u/t%d_%u",
-								dbOid, backendId, relNumber);
+				path = psprintf("base/%u/t%d_%u%s",
+								dbOid, backendId, relNumber, markstr);
 		}
 	}
 	else
@@ -184,27 +190,28 @@ GetRelationPath(Oid dbOid, Oid spcOid, RelFileNumber relNumber,
 		if (backendId == InvalidBackendId)
 		{
 			if (forkNumber != MAIN_FORKNUM)
-				path = psprintf("pg_tblspc/%u/%s/%u/%u_%s",
+				path = psprintf("pg_tblspc/%u/%s/%u/%u_%s%s",
 								spcOid, TABLESPACE_VERSION_DIRECTORY,
 								dbOid, relNumber,
-								forkNames[forkNumber]);
+								forkNames[forkNumber], markstr);
 			else
-				path = psprintf("pg_tblspc/%u/%s/%u/%u",
+				path = psprintf("pg_tblspc/%u/%s/%u/%u%s",
 								spcOid, TABLESPACE_VERSION_DIRECTORY,
-								dbOid, relNumber);
+								dbOid, relNumber, markstr);
 		}
 		else
 		{
 			if (forkNumber != MAIN_FORKNUM)
-				path = psprintf("pg_tblspc/%u/%s/%u/t%d_%u_%s",
+				path = psprintf("pg_tblspc/%u/%s/%u/t%d_%u_%s%s",
 								spcOid, TABLESPACE_VERSION_DIRECTORY,
 								dbOid, backendId, relNumber,
-								forkNames[forkNumber]);
+								forkNames[forkNumber], markstr);
 			else
-				path = psprintf("pg_tblspc/%u/%s/%u/t%d_%u",
+				path = psprintf("pg_tblspc/%u/%s/%u/t%d_%u%s",
 								spcOid, TABLESPACE_VERSION_DIRECTORY,
-								dbOid, backendId, relNumber);
+								dbOid, backendId, relNumber, markstr);
 		}
 	}
+
 	return path;
 }
diff --git a/src/include/catalog/storage.h b/src/include/catalog/storage.h
index 45a3c7835c..0b39c6ef56 100644
--- a/src/include/catalog/storage.h
+++ b/src/include/catalog/storage.h
@@ -25,6 +25,8 @@ extern PGDLLIMPORT int wal_skip_threshold;
 extern SMgrRelation RelationCreateStorage(RelFileLocator rlocator,
 										  char relpersistence,
 										  bool register_delete);
+extern void RelationCreateInitFork(Relation rel);
+extern void RelationDropInitFork(Relation rel);
 extern void RelationDropStorage(Relation rel);
 extern void RelationPreserveStorage(RelFileLocator rlocator, bool atCommit);
 extern void RelationPreTruncate(Relation rel);
@@ -43,6 +45,7 @@ extern void RestorePendingSyncs(char *startAddress);
 extern void smgrDoPendingDeletes(bool isCommit);
 extern void smgrDoPendingSyncs(bool isCommit, bool isParallelWorker);
 extern int	smgrGetPendingDeletes(bool forCommit, RelFileLocator **ptr);
+extern void smgrDoPendingCleanups(bool isCommit);
 extern void AtSubCommit_smgr(void);
 extern void AtSubAbort_smgr(void);
 extern void PostPrepare_smgr(void);
diff --git a/src/include/catalog/storage_xlog.h b/src/include/catalog/storage_xlog.h
index 6b0a7aa3df..a36646c6ee 100644
--- a/src/include/catalog/storage_xlog.h
+++ b/src/include/catalog/storage_xlog.h
@@ -18,17 +18,23 @@
 #include "lib/stringinfo.h"
 #include "storage/block.h"
 #include "storage/relfilelocator.h"
+#include "storage/smgr.h"
 
 /*
  * Declarations for smgr-related XLOG records
  *
- * Note: we log file creation and truncation here, but logging of deletion
- * actions is handled by xact.c, because it is part of transaction commit.
+ * Note: we log file creation, truncation and buffer persistence change here,
+ * but logging of deletion actions is handled mainly by xact.c, because it is
+ * part of transaction commit in most cases.  However, there's a case where
+ * init forks are deleted outside control of transaction.
  */
 
 /* XLOG gives us high 4 bits */
 #define XLOG_SMGR_CREATE	0x10
 #define XLOG_SMGR_TRUNCATE	0x20
+#define XLOG_SMGR_UNLINK	0x30
+#define XLOG_SMGR_MARK		0x40
+#define XLOG_SMGR_BUFPERSISTENCE	0x50
 
 typedef struct xl_smgr_create
 {
@@ -36,6 +42,26 @@ typedef struct xl_smgr_create
 	ForkNumber	forkNum;
 } xl_smgr_create;
 
+typedef struct xl_smgr_unlink
+{
+	RelFileLocator rlocator;
+	ForkNumber	forkNum;
+} xl_smgr_unlink;
+
+typedef enum smgr_mark_action
+{
+	XLOG_SMGR_MARK_CREATE = 'c',
+	XLOG_SMGR_MARK_UNLINK = 'u'
+} smgr_mark_action;
+
+typedef struct xl_smgr_mark
+{
+	RelFileLocator rlocator;
+	ForkNumber	forkNum;
+	StorageMarks mark;
+	smgr_mark_action action;
+} xl_smgr_mark;
+
 /* flags for xl_smgr_truncate */
 #define SMGR_TRUNCATE_HEAP		0x0001
 #define SMGR_TRUNCATE_VM		0x0002
@@ -51,6 +77,11 @@ typedef struct xl_smgr_truncate
 } xl_smgr_truncate;
 
 extern void log_smgrcreate(const RelFileLocator *rlocator, ForkNumber forkNum);
+extern void log_smgrunlink(const RelFileLocator *rlocator, ForkNumber forkNum);
+extern void log_smgrcreatemark(const RelFileLocator *rlocator,
+							   ForkNumber forkNum, StorageMarks mark);
+extern void log_smgrunlinkmark(const RelFileLocator *rlocator,
+							   ForkNumber forkNum, StorageMarks mark);
 
 extern void smgr_redo(XLogReaderState *record);
 extern void smgr_desc(StringInfo buf, XLogReaderState *record);
diff --git a/src/include/common/relpath.h b/src/include/common/relpath.h
index 511c21682e..28c9dbcd13 100644
--- a/src/include/common/relpath.h
+++ b/src/include/common/relpath.h
@@ -74,7 +74,7 @@ extern int	forkname_chars(const char *str, ForkNumber *fork);
 extern char *GetDatabasePath(Oid dbOid, Oid spcOid);
 
 extern char *GetRelationPath(Oid dbOid, Oid spcOid, RelFileNumber relNumber,
-							 int backendId, ForkNumber forkNumber);
+							 int backendId, ForkNumber forkNumber, char mark);
 
 /*
  * Wrapper macros for GetRelationPath.  Beware of multiple
@@ -84,7 +84,7 @@ extern char *GetRelationPath(Oid dbOid, Oid spcOid, RelFileNumber relNumber,
 /* First argument is a RelFileLocator */
 #define relpathbackend(rlocator, backend, forknum) \
 	GetRelationPath((rlocator).dbOid, (rlocator).spcOid, (rlocator).relNumber, \
-					backend, forknum)
+					backend, forknum, 0)
 
 /* First argument is a RelFileLocator */
 #define relpathperm(rlocator, forknum) \
@@ -94,4 +94,9 @@ extern char *GetRelationPath(Oid dbOid, Oid spcOid, RelFileNumber relNumber,
 #define relpath(rlocator, forknum) \
 	relpathbackend((rlocator).locator, (rlocator).backend, forknum)
 
+/* First argument is a RelFileLocatorBackend */
+#define markpath(rlocator, forknum, mark)								\
+	GetRelationPath((rlocator).locator.dbOid, (rlocator).locator.spcOid, \
+					(rlocator).locator.relNumber,						\
+					(rlocator).backend, forknum, mark)
 #endif							/* RELPATH_H */
diff --git a/src/include/storage/fd.h b/src/include/storage/fd.h
index f85de97d08..91612f2e42 100644
--- a/src/include/storage/fd.h
+++ b/src/include/storage/fd.h
@@ -179,6 +179,7 @@ extern void pg_flush_data(int fd, off_t offset, off_t nbytes);
 extern int	pg_truncate(const char *path, off_t length);
 extern void fsync_fname(const char *fname, bool isdir);
 extern int	fsync_fname_ext(const char *fname, bool isdir, bool ignore_perm, int elevel);
+extern int	fsync_parent_path(const char *fname, int elevel);
 extern int	durable_rename(const char *oldfile, const char *newfile, int elevel);
 extern int	durable_unlink(const char *fname, int elevel);
 extern void SyncDataDirectory(void);
diff --git a/src/include/storage/md.h b/src/include/storage/md.h
index 8f32af9ef3..37de1a0d7b 100644
--- a/src/include/storage/md.h
+++ b/src/include/storage/md.h
@@ -23,6 +23,10 @@
 extern void mdinit(void);
 extern void mdopen(SMgrRelation reln);
 extern void mdclose(SMgrRelation reln, ForkNumber forknum);
+extern void mdcreatemark(SMgrRelation reln, ForkNumber forknum,
+						 StorageMarks mark, bool isRedo);
+extern void mdunlinkmark(SMgrRelation reln, ForkNumber forknum,
+						 StorageMarks mark, bool isRedo);
 extern void mdcreate(SMgrRelation reln, ForkNumber forknum, bool isRedo);
 extern bool mdexists(SMgrRelation reln, ForkNumber forknum);
 extern void mdunlink(RelFileLocatorBackend rlocator, ForkNumber forknum, bool isRedo);
@@ -41,12 +45,14 @@ extern void mdtruncate(SMgrRelation reln, ForkNumber forknum,
 					   BlockNumber nblocks);
 extern void mdimmedsync(SMgrRelation reln, ForkNumber forknum);
 
+extern void ForgetRelationForkSyncRequests(RelFileLocatorBackend rlocator,
+										   ForkNumber forknum);
 extern void ForgetDatabaseSyncRequests(Oid dbid);
 extern void DropRelationFiles(RelFileLocator *delrels, int ndelrels, bool isRedo);
 
 /* md sync callbacks */
 extern int	mdsyncfiletag(const FileTag *ftag, char *path);
-extern int	mdunlinkfiletag(const FileTag *ftag, char *path);
+extern int	mdunlinkfiletag(const FileTag *ftag, char *path, StorageMarks mark);
 extern bool mdfiletagmatches(const FileTag *ftag, const FileTag *candidate);
 
 #endif							/* MD_H */
diff --git a/src/include/storage/reinit.h b/src/include/storage/reinit.h
index e2bbb5abe9..119dac1505 100644
--- a/src/include/storage/reinit.h
+++ b/src/include/storage/reinit.h
@@ -16,14 +16,16 @@
 #define REINIT_H
 
 #include "common/relpath.h"
-
+#include "storage/smgr.h"
 
 extern void ResetUnloggedRelations(int op);
 extern bool parse_filename_for_nontemp_relation(const char *name,
 												int *relnumchars,
-												ForkNumber *fork);
+												ForkNumber *fork,
+												StorageMarks *mark);
 
 #define UNLOGGED_RELATION_CLEANUP		0x0001
-#define UNLOGGED_RELATION_INIT			0x0002
+#define UNLOGGED_RELATION_DROP_BUFFER	0x0002
+#define UNLOGGED_RELATION_INIT			0x0004
 
 #endif							/* REINIT_H */
diff --git a/src/include/storage/smgr.h b/src/include/storage/smgr.h
index 0935144f42..da6e0f3d64 100644
--- a/src/include/storage/smgr.h
+++ b/src/include/storage/smgr.h
@@ -18,6 +18,18 @@
 #include "storage/block.h"
 #include "storage/relfilelocator.h"
 
+/*
+ * Storage marks is a file of which existence suggests something about a
+ * file. The name of such files is "<filename>.<mark>", where the mark is one
+ * of the values of StorageMarks. Since ".<digit>" means segment files so don't
+ * use digits for the mark character.
+ */
+typedef enum StorageMarks
+{
+	SMGR_MARK_NONE = 0,
+	SMGR_MARK_UNCOMMITTED = 'u' /* the file is not committed yet */
+} StorageMarks;
+
 /*
  * smgr.c maintains a table of SMgrRelation objects, which are essentially
  * cached file handles.  An SMgrRelation is created (if not already present)
@@ -87,7 +99,12 @@ extern void smgrcloseall(void);
 extern void smgrcloserellocator(RelFileLocatorBackend rlocator);
 extern void smgrrelease(SMgrRelation reln);
 extern void smgrreleaseall(void);
+extern void smgrcreatemark(SMgrRelation reln, ForkNumber forknum,
+						   StorageMarks mark, bool isRedo);
+extern void smgrunlinkmark(SMgrRelation reln, ForkNumber forknum,
+						   StorageMarks mark, bool isRedo);
 extern void smgrcreate(SMgrRelation reln, ForkNumber forknum, bool isRedo);
+extern void smgrunlink(SMgrRelation reln, ForkNumber forknum, bool isRedo);
 extern void smgrdosyncall(SMgrRelation *rels, int nrels);
 extern void smgrdounlinkall(SMgrRelation *rels, int nrels, bool isRedo);
 extern void smgrextend(SMgrRelation reln, ForkNumber forknum,
diff --git a/src/test/recovery/t/013_crash_restart.pl b/src/test/recovery/t/013_crash_restart.pl
index 92e7b367df..9def8d2062 100644
--- a/src/test/recovery/t/013_crash_restart.pl
+++ b/src/test/recovery/t/013_crash_restart.pl
@@ -86,6 +86,24 @@ ok( pump_until(
 $killme_stdout = '';
 $killme_stderr = '';
 
+#create a table that should *not* survive, but has rows.
+#the table's contents is requried to cause access to the storage file
+#after a restart.
+$killme_stdin .= q[
+CREATE TABLE not_alive AS SELECT 1 as a;
+SELECT pg_relation_filepath('not_alive');
+];
+ok( pump_until(
+		$killme,         $psql_timeout,
+		\$killme_stdout, qr/[[:alnum:]\/]+[\r\n]$/m),
+	'added in-creation table');
+my $not_alive_relfile = $node->data_dir . "/" . $killme_stdout;
+chomp($not_alive_relfile);
+$killme_stdout = '';
+$killme_stderr = '';
+
+# The relfile must be exists now
+ok ( -e $not_alive_relfile, 'relfile for in-creation table');
 
 # Start longrunning query in second session; its failure will signal that
 # crash-restart has occurred.  The initial wait for the trivial select is to
@@ -144,6 +162,9 @@ $killme->run();
 ($monitor_stdin, $monitor_stdout, $monitor_stderr) = ('', '', '');
 $monitor->run();
 
+# The relfile must have been removed due to the recent restart.
+ok ( ! -e $not_alive_relfile,
+	 'relfile for the in-creation table should be removed after restart');
 
 # Acquire pid of new backend
 $killme_stdin .= q[
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 097f42e1b3..747b7557dc 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1986,6 +1986,7 @@ PatternInfoArray
 Pattern_Prefix_Status
 Pattern_Type
 PendingFsyncEntry
+PendingMarkCleanup
 PendingRelDelete
 PendingRelSync
 PendingUnlinkEntry
@@ -2617,6 +2618,7 @@ StdRdOptIndexCleanup
 StdRdOptions
 Step
 StopList
+StorageMarks
 StrategyNumber
 StreamCtl
 String
@@ -3629,6 +3631,7 @@ registered_buffer
 regmatch_t
 regoff_t
 regproc
+relfile_entry
 relopt_bool
 relopt_enum
 relopt_enum_elt_def
@@ -3682,6 +3685,7 @@ slist_iter
 slist_mutable_iter
 slist_node
 slock_t
+smgr_mark_action
 socket_set
 socklen_t
 spgBulkDeleteState
@@ -3883,7 +3887,9 @@ xl_restore_point
 xl_running_xacts
 xl_seq_rec
 xl_smgr_create
+xl_smgr_mark
 xl_smgr_truncate
+xl_smgr_unlink
 xl_standby_lock
 xl_standby_locks
 xl_tblspc_create_rec
-- 
2.31.1

>From 3531ead1788045b602f43af06fc1ba3ddf74c46b Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horikyota....@gmail.com>
Date: Wed, 15 Mar 2023 15:42:09 +0900
Subject: [PATCH v27 2/3] In-place table persistence change

Currently, the command cuases a large amount of file I/O due to heap
rewrite, even though ALTER TABLE SET UNLOGGED does not require any
data rewrites.  In addition, this patch changes ALTER TABLE SET LOGGED
to emit XLOG_FPI records instead of a large number of HEAP_INSERT's
when wal_level > minimal, as this option is likely to be less resource
intensive.
---
 src/backend/access/rmgrdesc/smgrdesc.c   |  12 +
 src/backend/catalog/storage.c            | 290 ++++++++++++++++++++++-
 src/backend/commands/tablecmds.c         | 269 ++++++++++++++++++---
 src/backend/storage/buffer/bufmgr.c      |  85 +++++++
 src/backend/storage/file/reinit.c        |  51 +++-
 src/bin/pg_rewind/parsexlog.c            |   6 +
 src/bin/pg_rewind/pg_rewind.c            |   1 -
 src/include/catalog/storage_xlog.h       |   8 +
 src/include/storage/bufmgr.h             |   2 +
 src/test/recovery/t/013_crash_restart.pl |  21 --
 src/tools/pgindent/typedefs.list         |   1 +
 11 files changed, 673 insertions(+), 73 deletions(-)

diff --git a/src/backend/access/rmgrdesc/smgrdesc.c b/src/backend/access/rmgrdesc/smgrdesc.c
index f8187385c4..e2998a3ee4 100644
--- a/src/backend/access/rmgrdesc/smgrdesc.c
+++ b/src/backend/access/rmgrdesc/smgrdesc.c
@@ -71,6 +71,15 @@ smgr_desc(StringInfo buf, XLogReaderState *record)
 		appendStringInfo(buf, "%s %s", action, path);
 		pfree(path);
 	}
+	else if (info == XLOG_SMGR_BUFPERSISTENCE)
+	{
+		xl_smgr_bufpersistence *xlrec = (xl_smgr_bufpersistence *) rec;
+		char	   *path = relpathperm(xlrec->rlocator, MAIN_FORKNUM);
+
+		appendStringInfoString(buf, path);
+		appendStringInfo(buf, " persistence %d", xlrec->persistence);
+		pfree(path);
+	}
 }
 
 const char *
@@ -92,6 +101,9 @@ smgr_identify(uint8 info)
 		case XLOG_SMGR_MARK:
 			id = "MARK";
 			break;
+		case XLOG_SMGR_BUFPERSISTENCE:
+			id = "BUFPERSISTENCE";
+			break;
 	}
 
 	return id;
diff --git a/src/backend/catalog/storage.c b/src/backend/catalog/storage.c
index 03e06246be..97d1230ee8 100644
--- a/src/backend/catalog/storage.c
+++ b/src/backend/catalog/storage.c
@@ -69,11 +69,13 @@ typedef struct PendingRelDelete
 
 #define	PCOP_UNLINK_FORK		(1 << 0)
 #define	PCOP_UNLINK_MARK		(1 << 1)
+#define	PCOP_SET_PERSISTENCE	(1 << 2)
 
 typedef struct PendingCleanup
 {
 	RelFileLocator rlocator;	/* relation that need a cleanup */
 	int			op;				/* operation mask */
+	bool		bufpersistence; /* buffer persistence to set */
 	ForkNumber	unlink_forknum; /* forknum to unlink */
 	StorageMarks unlink_mark;	/* mark to unlink */
 	BackendId	backend;		/* InvalidBackendId if not a temp rel */
@@ -223,6 +225,202 @@ RelationCreateStorage(RelFileLocator rlocator, char relpersistence,
 	return srel;
 }
 
+/*
+ * RelationCreateInitFork
+ *		Create physical storage for the init fork of a relation.
+ *
+ * Create the init fork for the relation.
+ *
+ * This function is transactional. The creation is WAL-logged, and if the
+ * transaction aborts later on, the init fork will be removed.
+ */
+void
+RelationCreateInitFork(Relation rel)
+{
+	RelFileLocator rlocator = rel->rd_locator;
+	PendingCleanup *pending;
+	PendingCleanup *prev;
+	PendingCleanup *next;
+	SMgrRelation srel;
+	bool		create = true;
+
+	/* switch buffer persistence */
+	SetRelationBuffersPersistence(RelationGetSmgr(rel), false, false);
+
+	/*
+	 * If we have a pending-unlink for the init-fork of this relation, that
+	 * means the init-fork exists since before the current transaction
+	 * started. This function reverts that change just by removing the entry.
+	 * See RelationDropInitFork.
+	 */
+	prev = NULL;
+	for (pending = pendingCleanups; pending != NULL; pending = next)
+	{
+		next = pending->next;
+
+		if (RelFileLocatorEquals(rlocator, pending->rlocator) &&
+			pending->unlink_forknum == INIT_FORKNUM)
+		{
+			if (prev)
+				prev->next = next;
+			else
+				pendingCleanups = next;
+
+			pfree(pending);
+			/* prev does not change */
+
+			create = false;
+		}
+		else
+			prev = pending;
+	}
+
+	if (!create)
+		return;
+
+	/* create the init fork, along with the mark file */
+	srel = smgropen(rlocator, InvalidBackendId);
+	log_smgrcreatemark(&rlocator, INIT_FORKNUM, SMGR_MARK_UNCOMMITTED);
+	smgrcreatemark(srel, INIT_FORKNUM, SMGR_MARK_UNCOMMITTED, false);
+
+	/* We don't have existing init fork, create it. */
+	smgrcreate(srel, INIT_FORKNUM, false);
+
+	/*
+	 * For index relations, WAL-logging and file sync are performed by
+	 * ambuildempty. On the other hand, we manually perform these tasks here
+	 * for heap relations.
+	 */
+	if (rel->rd_rel->relkind == RELKIND_INDEX)
+		rel->rd_indam->ambuildempty(rel);
+	else
+	{
+		log_smgrcreate(&rlocator, INIT_FORKNUM);
+		smgrimmedsync(srel, INIT_FORKNUM);
+	}
+
+	/* drop the init fork, mark file then revert persistence at abort */
+	pending = (PendingCleanup *)
+		MemoryContextAlloc(TopMemoryContext, sizeof(PendingCleanup));
+	pending->rlocator = rlocator;
+	pending->op = PCOP_UNLINK_FORK | PCOP_UNLINK_MARK | PCOP_SET_PERSISTENCE;
+	pending->unlink_forknum = INIT_FORKNUM;
+	pending->unlink_mark = SMGR_MARK_UNCOMMITTED;
+	pending->bufpersistence = true;
+	pending->backend = InvalidBackendId;
+	pending->atCommit = false;
+	pending->nestLevel = GetCurrentTransactionNestLevel();
+	pending->next = pendingCleanups;
+	pendingCleanups = pending;
+
+	/* drop mark file at commit */
+	pending = (PendingCleanup *)
+		MemoryContextAlloc(TopMemoryContext, sizeof(PendingCleanup));
+	pending->rlocator = rlocator;
+	pending->op = PCOP_UNLINK_MARK;
+	pending->unlink_forknum = INIT_FORKNUM;
+	pending->unlink_mark = SMGR_MARK_UNCOMMITTED;
+	pending->backend = InvalidBackendId;
+	pending->atCommit = true;
+	pending->nestLevel = GetCurrentTransactionNestLevel();
+	pending->next = pendingCleanups;
+	pendingCleanups = pending;
+}
+
+/*
+ * RelationDropInitFork
+ *		Delete physical storage for the init fork of a relation.
+ */
+void
+RelationDropInitFork(Relation rel)
+{
+	RelFileLocator rlocator = rel->rd_locator;
+	PendingCleanup *pending;
+	PendingCleanup *prev;
+	PendingCleanup *next;
+	bool		inxact_created = false;
+
+	/* switch buffer persistence */
+	SetRelationBuffersPersistence(RelationGetSmgr(rel), true, false);
+
+	/*
+	 * Search for pending-unlink associated with the init-fork of the
+	 * relation. The presence of one indicates that the init fork was created
+	 * within the current transaction.
+	 */
+	prev = NULL;
+	for (pending = pendingCleanups; pending != NULL; pending = next)
+	{
+		next = pending->next;
+
+		if (RelFileLocatorEquals(rlocator, pending->rlocator) &&
+			pending->unlink_forknum != INIT_FORKNUM)
+		{
+			/* unlink list entry */
+			if (prev)
+				prev->next = next;
+			else
+				pendingCleanups = next;
+
+			pfree(pending);
+			/* prev does not change */
+
+			inxact_created = true;
+		}
+		else
+			prev = pending;
+	}
+
+	/*
+	 * If the init-fork was created in this transaction, we immediately remove
+	 * both the init fork and mark file. Otherwise, we register an at-commit
+	 * pending-unlink for the existing init fork. See
+	 * RelationCreateInitFork.
+	 */
+	if (inxact_created)
+	{
+		SMgrRelation srel = smgropen(rlocator, InvalidBackendId);
+		ForkNumber	 forknum = INIT_FORKNUM;
+		BlockNumber	 firstblock = 0;
+
+		/*
+		 * Some AMs initialize INIT fork via buffer manager. To properly drop
+		 * the init fork, we need to drop all buffers for the INIT fork first,
+		 * then unlink the INIT fork along with the mark file.
+		 */
+		DropRelationBuffers(srel, &forknum, 1, &firstblock);
+		log_smgrunlinkmark(&rlocator, INIT_FORKNUM, SMGR_MARK_UNCOMMITTED);
+		smgrunlinkmark(srel, INIT_FORKNUM, SMGR_MARK_UNCOMMITTED, false);
+		log_smgrunlink(&rlocator, INIT_FORKNUM);
+		smgrunlink(srel, INIT_FORKNUM, false);
+		return;
+	}
+
+	/* register drop of this init fork file at commit */
+	pending = (PendingCleanup *)
+		MemoryContextAlloc(TopMemoryContext, sizeof(PendingCleanup));
+	pending->rlocator = rlocator;
+	pending->op = PCOP_UNLINK_FORK;
+	pending->unlink_forknum = INIT_FORKNUM;
+	pending->backend = InvalidBackendId;
+	pending->atCommit = true;
+	pending->nestLevel = GetCurrentTransactionNestLevel();
+	pending->next = pendingCleanups;
+	pendingCleanups = pending;
+
+	/* revert buffer-persistence changes at abort */
+	pending = (PendingCleanup *)
+		MemoryContextAlloc(TopMemoryContext, sizeof(PendingCleanup));
+	pending->rlocator = rlocator;
+	pending->op = PCOP_SET_PERSISTENCE;
+	pending->bufpersistence = false;
+	pending->backend = InvalidBackendId;
+	pending->atCommit = false;
+	pending->nestLevel = GetCurrentTransactionNestLevel();
+	pending->next = pendingCleanups;
+	pendingCleanups = pending;
+}
+
 /*
  * Perform XLogInsert of an XLOG_SMGR_CREATE record to WAL.
  */
@@ -305,6 +503,25 @@ log_smgrunlinkmark(const RelFileLocator *rlocator, ForkNumber forkNum,
 	XLogInsert(RM_SMGR_ID, XLOG_SMGR_MARK | XLR_SPECIAL_REL_UPDATE);
 }
 
+/*
+ * Perform XLogInsert of an XLOG_SMGR_BUFPERSISTENCE record to WAL.
+ */
+void
+log_smgrbufpersistence(const RelFileLocator *rlocator, bool persistence)
+{
+	xl_smgr_bufpersistence xlrec;
+
+	/*
+	 * Make an XLOG entry reporting the change of buffer persistence.
+	 */
+	xlrec.rlocator = *rlocator;
+	xlrec.persistence = persistence;
+
+	XLogBeginInsert();
+	XLogRegisterData((char *) &xlrec, sizeof(xlrec));
+	XLogInsert(RM_SMGR_ID, XLOG_SMGR_BUFPERSISTENCE | XLR_SPECIAL_REL_UPDATE);
+}
+
 /*
  * RelationDropStorage
  *		Schedule unlinking of physical storage at transaction commit.
@@ -858,10 +1075,28 @@ smgrDoPendingCleanups(bool isCommit)
 				srel = smgropen(pending->rlocator, pending->backend);
 
 				Assert((pending->op &
-						~(PCOP_UNLINK_FORK | PCOP_UNLINK_MARK)) == 0);
+						~(PCOP_UNLINK_FORK | PCOP_UNLINK_MARK |
+						  PCOP_SET_PERSISTENCE)) == 0);
+
+				if (pending->op & PCOP_SET_PERSISTENCE)
+				{
+					SetRelationBuffersPersistence(srel, pending->bufpersistence,
+												  InRecovery);
+				}
 
 				if (pending->op & PCOP_UNLINK_FORK)
 				{
+					/*
+					 * Unlink the fork file. Currently we only apply this
+					 * operation for init forks and it is ceratin that the init
+					 * fork is not loaded on shared buffers at this point.  In
+					 * the case of RelationDropInitFork, the function should
+					 * have dropped buffers. In the case of
+					 * RelationCreateInitFork, PCOP_SET_PERSISTENCE is set and
+					 * the buffers were dropped just before.
+					 */
+					Assert(pending->unlink_forknum == INIT_FORKNUM);
+
 					/* Don't emit wal while recovery. */
 					if (!InRecovery)
 						log_smgrunlink(&pending->rlocator,
@@ -1311,6 +1546,59 @@ smgr_redo(XLogReaderState *record)
 			}
 		}
 	}
+	else if (info == XLOG_SMGR_BUFPERSISTENCE)
+	{
+		xl_smgr_bufpersistence *xlrec =
+		(xl_smgr_bufpersistence *) XLogRecGetData(record);
+		SMgrRelation reln;
+		PendingCleanup *pending;
+		PendingCleanup *prev = NULL;
+
+		reln = smgropen(xlrec->rlocator, InvalidBackendId);
+		SetRelationBuffersPersistence(reln, xlrec->persistence, true);
+
+		/*
+		 * Delete pending action for persistence change if any. We should have
+		 * at most one entry for this action.
+		 */
+		for (pending = pendingCleanups; pending != NULL;
+			 pending = pending->next)
+		{
+			if (RelFileLocatorEquals(xlrec->rlocator, pending->rlocator) &&
+				(pending->op & PCOP_SET_PERSISTENCE) != 0)
+			{
+				Assert(pending->bufpersistence == xlrec->persistence);
+
+				if (prev)
+					prev->next = pending->next;
+				else
+					pendingCleanups = pending->next;
+
+				pfree(pending);
+				break;
+			}
+
+			prev = pending;
+		}
+
+		/*
+		 * At abort time, revert any changes to buffer-persistence that were
+		 * made in this transaction.
+		 */
+		if (!pending)
+		{
+			pending = (PendingCleanup *)
+				MemoryContextAlloc(TopMemoryContext, sizeof(PendingCleanup));
+			pending->rlocator = xlrec->rlocator;
+			pending->op = PCOP_SET_PERSISTENCE;
+			pending->bufpersistence = !xlrec->persistence;
+			pending->backend = InvalidBackendId;
+			pending->atCommit = false;
+			pending->nestLevel = GetCurrentTransactionNestLevel();
+			pending->next = pendingCleanups;
+			pendingCleanups = pending;
+		}
+	}
 	else
 		elog(PANIC, "smgr_redo: unknown op code %u", info);
 }
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 3e2c5f797c..becef96927 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -55,6 +55,7 @@
 #include "commands/defrem.h"
 #include "commands/event_trigger.h"
 #include "commands/policy.h"
+#include "commands/progress.h"
 #include "commands/sequence.h"
 #include "commands/tablecmds.h"
 #include "commands/tablespace.h"
@@ -5439,6 +5440,189 @@ ATParseTransformCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
 	return newcmd;
 }
 
+/*
+ * RelationChangePersistence: perform in-place persistence change of a relation
+ */
+static void
+RelationChangePersistence(AlteredTableInfo *tab, char persistence,
+						  LOCKMODE lockmode)
+{
+	Relation	rel;
+	Relation	classRel;
+	HeapTuple	tuple,
+				newtuple;
+	Datum		new_val[Natts_pg_class];
+	bool		new_null[Natts_pg_class],
+				new_repl[Natts_pg_class];
+	int			i;
+	List	   *relids;
+	ListCell   *lc_oid;
+
+	Assert(tab->rewrite == AT_REWRITE_ALTER_PERSISTENCE);
+	Assert(lockmode == AccessExclusiveLock);
+
+	/*
+	 * ATRewriteTable should be used instead of this function under the
+	 * following condition.
+	 */
+	Assert(tab->constraints == NULL && tab->partition_constraint == NULL &&
+		   tab->newvals == NULL && !tab->verify_new_notnull);
+
+	rel = table_open(tab->relid, lockmode);
+
+	Assert(rel->rd_rel->relpersistence != persistence);
+
+	elog(DEBUG1, "perform in-place persistence change");
+
+	/*
+	 * Initially we gather all relations that require persistence change.
+	 */
+
+	/* Collect OIDs of indexes and toast relations */
+	relids = RelationGetIndexList(rel);
+	relids = lcons_oid(rel->rd_id, relids);
+
+	/* Add toast relation if any */
+	if (OidIsValid(rel->rd_rel->reltoastrelid))
+	{
+		List	   *toastidx;
+		Relation	toastrel = table_open(rel->rd_rel->reltoastrelid, lockmode);
+
+		relids = lappend_oid(relids, rel->rd_rel->reltoastrelid);
+		toastidx = RelationGetIndexList(toastrel);
+		relids = list_concat(relids, toastidx);
+		pfree(toastidx);
+		table_close(toastrel, NoLock);
+	}
+
+	table_close(rel, NoLock);
+
+	/* Make changes in storage */
+	classRel = table_open(RelationRelationId, RowExclusiveLock);
+
+	foreach(lc_oid, relids)
+	{
+		Oid			reloid = lfirst_oid(lc_oid);
+		Relation	r = relation_open(reloid, lockmode);
+
+		/*
+		 * XXXX: Some access methods do not support in-place persistence
+		 * changes. GiST uses page LSNs to figure out whether a block has been
+		 * modified. However UNLOGGED GiST indexes use fake LSNs that are
+		 * incompatible with the real LSNs used for LOGGED indexes.
+		 *
+		 * Maybe if gistGetFakeLSN behaved the same way for permanent and
+		 * unlogged indexes, we could potentially avoid index rebuilds in
+		 * exchange for emitting some extra WAL records while the index is
+		 * unlogged.
+		 *
+		 * Check relam against a positive list so that we take the hard way for
+		 * unknown AMs.
+		 */
+		if (r->rd_rel->relkind == RELKIND_INDEX &&
+		/* GiST is excluded */
+			r->rd_rel->relam != BTREE_AM_OID &&
+			r->rd_rel->relam != HASH_AM_OID &&
+			r->rd_rel->relam != GIN_AM_OID &&
+			r->rd_rel->relam != SPGIST_AM_OID &&
+			r->rd_rel->relam != BRIN_AM_OID)
+		{
+			int			reindex_flags;
+			ReindexParams params = {0};
+
+			/* reindex doesn't allow concurrent use of the index */
+			table_close(r, NoLock);
+
+			reindex_flags =
+				REINDEX_REL_SUPPRESS_INDEX_USE |
+				REINDEX_REL_CHECK_CONSTRAINTS;
+
+			/* Set the same persistence with the parent relation. */
+			if (persistence == RELPERSISTENCE_UNLOGGED)
+				reindex_flags |= REINDEX_REL_FORCE_INDEXES_UNLOGGED;
+			else
+				reindex_flags |= REINDEX_REL_FORCE_INDEXES_PERMANENT;
+
+			reindex_index(reloid, reindex_flags, persistence, &params);
+
+			continue;
+		}
+
+		/* Create or drop init fork */
+		if (persistence == RELPERSISTENCE_UNLOGGED)
+			RelationCreateInitFork(r);
+		else
+			RelationDropInitFork(r);
+
+		/*
+		 * If this relation is changed to WAL-logged, immediately sync all
+		 * files except for init fork to establish the initial state on
+		 * storage.  The buffers should have already been flushed out by
+		 * RelationCreate(Drop)InitFork called immediately above. The init fork
+		 * should have already been synchronized as needed.
+		 */
+		if (persistence == RELPERSISTENCE_PERMANENT)
+		{
+			for (i = 0; i < INIT_FORKNUM; i++)
+			{
+				if (smgrexists(RelationGetSmgr(r), i))
+					smgrimmedsync(RelationGetSmgr(r), i);
+			}
+		}
+
+		/* Update catalog */
+		tuple = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(reloid));
+		if (!HeapTupleIsValid(tuple))
+			elog(ERROR, "cache lookup failed for relation %u", reloid);
+
+		memset(new_val, 0, sizeof(new_val));
+		memset(new_null, false, sizeof(new_null));
+		memset(new_repl, false, sizeof(new_repl));
+
+		new_val[Anum_pg_class_relpersistence - 1] = CharGetDatum(persistence);
+		new_null[Anum_pg_class_relpersistence - 1] = false;
+		new_repl[Anum_pg_class_relpersistence - 1] = true;
+
+		newtuple = heap_modify_tuple(tuple, RelationGetDescr(classRel),
+									 new_val, new_null, new_repl);
+
+		CatalogTupleUpdate(classRel, &newtuple->t_self, newtuple);
+		heap_freetuple(newtuple);
+
+		/*
+		 * If wal_level >= replica, switching to LOGGED requires the relation
+		 * content to be WAL-logged for later recovery. We don't emit this if
+		 * wal_level = minimal.
+		 */
+		if (persistence == RELPERSISTENCE_PERMANENT && XLogIsNeeded())
+		{
+			ForkNumber	fork;
+			xl_smgr_truncate xlrec;
+
+			xlrec.blkno = 0;
+			xlrec.rlocator = r->rd_locator;
+			xlrec.flags = SMGR_TRUNCATE_ALL;
+
+			XLogBeginInsert();
+			XLogRegisterData((char *) &xlrec, sizeof(xlrec));
+
+			XLogInsert(RM_SMGR_ID, XLOG_SMGR_TRUNCATE | XLR_SPECIAL_REL_UPDATE);
+
+			for (fork = 0; fork < INIT_FORKNUM; fork++)
+			{
+				if (smgrexists(RelationGetSmgr(r), fork))
+					log_newpage_range(r, fork, 0,
+									  smgrnblocks(RelationGetSmgr(r), fork),
+									  false);
+			}
+		}
+
+		table_close(r, NoLock);
+	}
+
+	table_close(classRel, NoLock);
+}
+
 /*
  * ATRewriteTables: ALTER TABLE phase 3
  */
@@ -5569,48 +5753,55 @@ ATRewriteTables(AlterTableStmt *parsetree, List **wqueue, LOCKMODE lockmode,
 										 tab->relid,
 										 tab->rewrite);
 
-			/*
-			 * Create transient table that will receive the modified data.
-			 *
-			 * Ensure it is marked correctly as logged or unlogged.  We have
-			 * to do this here so that buffers for the new relfilenumber will
-			 * have the right persistence set, and at the same time ensure
-			 * that the original filenumbers's buffers will get read in with
-			 * the correct setting (i.e. the original one).  Otherwise a
-			 * rollback after the rewrite would possibly result with buffers
-			 * for the original filenumbers having the wrong persistence
-			 * setting.
-			 *
-			 * NB: This relies on swap_relation_files() also swapping the
-			 * persistence. That wouldn't work for pg_class, but that can't be
-			 * unlogged anyway.
-			 */
-			OIDNewHeap = make_new_heap(tab->relid, NewTableSpace, NewAccessMethod,
-									   persistence, lockmode);
+			if (tab->rewrite == AT_REWRITE_ALTER_PERSISTENCE)
+				RelationChangePersistence(tab, persistence, lockmode);
+			else
+			{
+				/*
+				 * Create transient table that will receive the modified data.
+				 *
+				 * Ensure it is marked correctly as logged or unlogged.  We
+				 * have to do this here so that buffers for the new
+				 * relfilenumber will have the right persistence set, and at
+				 * the same time ensure that the original filenumbers's buffers
+				 * will get read in with the correct setting (i.e. the original
+				 * one).  Otherwise a rollback after the rewrite would possibly
+				 * result with buffers for the original filenumbers having the
+				 * wrong persistence setting.
+				 *
+				 * NB: This relies on swap_relation_files() also swapping the
+				 * persistence. That wouldn't work for pg_class, but that
+				 * can't be unlogged anyway.
+				 */
+				OIDNewHeap = make_new_heap(tab->relid, NewTableSpace,
+										   NewAccessMethod,
+										   persistence, lockmode);
 
-			/*
-			 * Copy the heap data into the new table with the desired
-			 * modifications, and test the current data within the table
-			 * against new constraints generated by ALTER TABLE commands.
-			 */
-			ATRewriteTable(tab, OIDNewHeap, lockmode);
+				/*
+				 * Copy the heap data into the new table with the desired
+				 * modifications, and test the current data within the table
+				 * against new constraints generated by ALTER TABLE commands.
+				 */
+				ATRewriteTable(tab, OIDNewHeap, lockmode);
 
-			/*
-			 * Swap the physical files of the old and new heaps, then rebuild
-			 * indexes and discard the old heap.  We can use RecentXmin for
-			 * the table's new relfrozenxid because we rewrote all the tuples
-			 * in ATRewriteTable, so no older Xid remains in the table.  Also,
-			 * we never try to swap toast tables by content, since we have no
-			 * interest in letting this code work on system catalogs.
-			 */
-			finish_heap_swap(tab->relid, OIDNewHeap,
-							 false, false, true,
-							 !OidIsValid(tab->newTableSpace),
-							 RecentXmin,
-							 ReadNextMultiXactId(),
-							 persistence);
+				/*
+				 * Swap the physical files of the old and new heaps, then
+				 * rebuild indexes and discard the old heap.  We can use
+				 * RecentXmin for the table's new relfrozenxid because we
+				 * rewrote all the tuples in ATRewriteTable, so no older Xid
+				 * remains in the table.  Also, we never try to swap toast
+				 * tables by content, since we have no interest in letting
+				 * this code work on system catalogs.
+				 */
+				finish_heap_swap(tab->relid, OIDNewHeap,
+								 false, false, true,
+								 !OidIsValid(tab->newTableSpace),
+								 RecentXmin,
+								 ReadNextMultiXactId(),
+								 persistence);
 
-			InvokeObjectPostAlterHook(RelationRelationId, tab->relid, 0);
+				InvokeObjectPostAlterHook(RelationRelationId, tab->relid, 0);
+			}
 		}
 		else if (tab->rewrite > 0 && tab->relkind == RELKIND_SEQUENCE)
 		{
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 0a05577b68..2b00ec3eed 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -3240,6 +3240,91 @@ DropRelationBuffers(SMgrRelation smgr_reln, ForkNumber *forkNum,
 	}
 }
 
+/* ---------------------------------------------------------------------
+ *		SetRelationBuffersPersistence
+ *
+ *		This function changes the persistence of all buffer pages of a relation
+ *		then writes all dirty pages of the relation out to disk when switching
+ *		to PERMANENT. (or more precisely, to kernel disk buffers), ensuring
+ *		that the kernel has an up-to-date view of the relation.
+ *
+ *		The caller must be holding AccessExclusiveLock on the target relation
+ *		to ensure that no other backend is busy dirtying more blocks of the
+ *		relation.
+ *
+ *		XXX currently it sequentially searches the buffer pool, should be
+ *		changed to more clever ways of searching.  This routine is not used in
+ *		any performance-critical code paths, so it's not worth additional
+ *		overhead to make it go faster; but see also DropRelationBuffers.
+ *		--------------------------------------------------------------------
+ */
+void
+SetRelationBuffersPersistence(SMgrRelation srel, bool permanent, bool isRedo)
+{
+	int			i;
+	RelFileLocatorBackend rlocator = srel->smgr_rlocator;
+
+	Assert(!RelFileLocatorBackendIsTemp(rlocator));
+
+	if (!isRedo)
+		log_smgrbufpersistence(srel->smgr_rlocator.locator, permanent);
+
+	ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
+
+	for (i = 0; i < NBuffers; i++)
+	{
+		BufferDesc *bufHdr = GetBufferDescriptor(i);
+		uint32		buf_state;
+
+		if (!RelFileLocatorEquals(BufTagGetRelFileLocator(&bufHdr->tag),
+								  rlocator.locator))
+			continue;
+
+		ReservePrivateRefCountEntry();
+
+		buf_state = LockBufHdr(bufHdr);
+
+		if (!RelFileLocatorEquals(BufTagGetRelFileLocator(&bufHdr->tag),
+								  rlocator.locator))
+		{
+			UnlockBufHdr(bufHdr, buf_state);
+			continue;
+		}
+
+		if (permanent)
+		{
+			/* The init fork is being dropped, drop buffers for it. */
+			if (BufTagGetForkNum(&bufHdr->tag) == INIT_FORKNUM)
+			{
+				InvalidateBuffer(bufHdr);
+				continue;
+			}
+
+			buf_state |= BM_PERMANENT;
+			pg_atomic_write_u32(&bufHdr->state, buf_state);
+
+			/* flush this buffer when switching to PERMANENT */
+			if ((buf_state & (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY))
+			{
+				PinBuffer_Locked(bufHdr);
+				LWLockAcquire(BufferDescriptorGetContentLock(bufHdr),
+							  LW_SHARED);
+				FlushBuffer(bufHdr, srel);
+				LWLockRelease(BufferDescriptorGetContentLock(bufHdr));
+				UnpinBuffer(bufHdr);
+			}
+			else
+				UnlockBufHdr(bufHdr, buf_state);
+		}
+		else
+		{
+			/* There shouldn't be an init fork */
+			Assert(BufTagGetForkNum(&bufHdr->tag) != INIT_FORKNUM);
+			UnlockBufHdr(bufHdr, buf_state);
+		}
+	}
+}
+
 /* ---------------------------------------------------------------------
  *		DropRelationsAllBuffers
  *
diff --git a/src/backend/storage/file/reinit.c b/src/backend/storage/file/reinit.c
index 250cfe9e44..bdd1200132 100644
--- a/src/backend/storage/file/reinit.c
+++ b/src/backend/storage/file/reinit.c
@@ -38,6 +38,7 @@ typedef struct
 {
  	RelFileNumber	relNumber;		/* hash key */
 	bool			has_init;		/* has INIT fork */
+	bool			dirty_init;		/* needs to remove INIT fork */
 	bool			dirty_all;		/* needs to remove all forks */
 }  relfile_entry;
 
@@ -45,7 +46,10 @@ typedef struct
  * Clean up and reset relation files from before the last restart.
  *
  * If op includes UNLOGGED_RELATION_CLEANUP, we perform different operations
- * depending on the existence of mark files.
+ * depending on the existence of the "cleanup" forks.
+ *
+ * If SMGR_MARK_UNCOMMITTED mark file for init fork is present, we remove the
+ * init fork along with the mark file.
  *
  * If SMGR_MARK_UNCOMMITTED mark file for main fork is present we remove the
  * whole relation along with the mark file.
@@ -54,7 +58,7 @@ typedef struct
  * with the "init" fork, except for the "init" fork itself.
  *
  * If op includes UNLOGGED_RELATION_DROP_BUFFER, we drop all buffers for all
- * relations that are to be cleaned up.
+ * relations that have the "cleanup" and/or the "init" forks.
  *
  * If op includes UNLOGGED_RELATION_INIT, we copy the "init" fork to the main
  * fork.
@@ -241,7 +245,7 @@ ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname,
 			 * Put the OID portion of the name into the hash table,
 			 * if it isn't already.  If it has SMGR_MARK_UNCOMMITTED mark
 			 * files, the storage file is in dirty state, where clean up is
-			 * needed.
+			 * needed.  isn't already.
 			 */
 			key = atooid(de->d_name);
 			ent = hash_search(hash, &key, HASH_ENTER, &found);
@@ -249,10 +253,13 @@ ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname,
 			if (!found)
 			{
 				ent->has_init = false;
+				ent->dirty_init = false;
 				ent->dirty_all = false;
 			}
 
-			if (forkNum == MAIN_FORKNUM && mark == SMGR_MARK_UNCOMMITTED)
+			if (forkNum == INIT_FORKNUM && mark == SMGR_MARK_UNCOMMITTED)
+				ent->dirty_init = true;
+			else if (forkNum == MAIN_FORKNUM && mark == SMGR_MARK_UNCOMMITTED)
 				ent->dirty_all = true;
 			else
 			{
@@ -276,11 +283,10 @@ ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname,
 	{
 		/*
 		 * When we come here after recovery, smgr object for this file might
-		 * have been created. In that case we need to drop all buffers then the
-		 * smgr object.  Otherwise checkpointer wrongly tries to flush buffers
-		 * for nonexistent relation storage. This is safe as far as no other
-		 * backends have accessed the relation before starting archive
-		 * recovery.
+		 * have been created. In that case we need to drop all buffers then
+		 * the smgr object before initializing the unlogged relation.  This is
+		 * safe as far as no other backends have accessed the relation before
+		 * starting archive recovery.
 		 */
 		HASH_SEQ_STATUS status;
 		relfile_entry *ent;
@@ -296,6 +302,13 @@ ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname,
 		{
 			RelFileLocatorBackend rel;
 
+			/*
+			 * The relation is persistent and stays persistent. Don't drop the
+			 * buffers for this relation.
+			 */
+			if (ent->has_init && ent->dirty_init)
+				continue;
+
 			if (maxrels <= nrels)
 			{
 				maxrels *= 2;
@@ -352,8 +365,24 @@ ResetUnloggedRelationsInDbspaceDir(const char *dbspacedirname,
 				if (!ent->has_init)
 					continue;
 
-				if (forkNum == INIT_FORKNUM && mark == SMGR_MARK_NONE)
-					continue;
+				if (ent->dirty_init)
+				{
+					/*
+					 * The crashed transaction did SET UNLOGGED. This relation
+					 * is restored to a LOGGED relation.
+					 */
+					if (forkNum != INIT_FORKNUM)
+						continue;
+				}
+				else
+				{
+					/*
+					 * we don't remove the INIT fork of a non-dirty
+					 * relation files.
+					 */
+					if (forkNum == INIT_FORKNUM && mark == SMGR_MARK_NONE)
+						continue;
+				}
 			}
 
 			/* so, nuke it! */
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index e9e4bafb01..ddc8014e55 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -434,6 +434,12 @@ extractPageInfo(XLogReaderState *record)
 		 * empty so we don't need to bother the content.
 		 */
 	}
+	else if (rmid == RM_SMGR_ID && rminfo == XLOG_SMGR_BUFPERSISTENCE)
+	{
+		/*
+		 * We can safely ignore these. These don't make any on-disk changes.
+		 */
+	}
 	else if (rmid == RM_XACT_ID &&
 			 ((rminfo & XLOG_XACT_OPMASK) == XLOG_XACT_COMMIT ||
 			  (rminfo & XLOG_XACT_OPMASK) == XLOG_XACT_COMMIT_PREPARED ||
diff --git a/src/bin/pg_rewind/pg_rewind.c b/src/bin/pg_rewind/pg_rewind.c
index f7f3b8227f..b3a1f255d7 100644
--- a/src/bin/pg_rewind/pg_rewind.c
+++ b/src/bin/pg_rewind/pg_rewind.c
@@ -460,7 +460,6 @@ main(int argc, char **argv)
 	if (showprogress)
 		pg_log_info("reading source file list");
 	source->traverse_files(source, &process_source_file);
-
 	if (showprogress)
 		pg_log_info("reading target file list");
 	traverse_datadir(datadir_target, &process_target_file);
diff --git a/src/include/catalog/storage_xlog.h b/src/include/catalog/storage_xlog.h
index a36646c6ee..6e79c68f5b 100644
--- a/src/include/catalog/storage_xlog.h
+++ b/src/include/catalog/storage_xlog.h
@@ -62,6 +62,12 @@ typedef struct xl_smgr_mark
 	smgr_mark_action action;
 } xl_smgr_mark;
 
+typedef struct xl_smgr_bufpersistence
+{
+	RelFileLocator rlocator;
+	bool		persistence;
+} xl_smgr_bufpersistence;
+
 /* flags for xl_smgr_truncate */
 #define SMGR_TRUNCATE_HEAP		0x0001
 #define SMGR_TRUNCATE_VM		0x0002
@@ -82,6 +88,8 @@ extern void log_smgrcreatemark(const RelFileLocator *rlocator,
 							   ForkNumber forkNum, StorageMarks mark);
 extern void log_smgrunlinkmark(const RelFileLocator *rlocator,
 							   ForkNumber forkNum, StorageMarks mark);
+extern void log_smgrbufpersistence(const RelFileLocator *rlocator,
+								   bool persistence);
 
 extern void smgr_redo(XLogReaderState *record);
 extern void smgr_desc(StringInfo buf, XLogReaderState *record);
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index b8a18b8081..fd34810dc2 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -156,6 +156,8 @@ extern void DropRelationBuffers(struct SMgrRelationData *smgr_reln,
 								int nforks, BlockNumber *firstDelBlock);
 extern void DropRelationsAllBuffers(struct SMgrRelationData **smgr_reln,
 									int nlocators);
+extern void SetRelationBuffersPersistence(struct SMgrRelationData *srel,
+										  bool permanent, bool isRedo);
 extern void DropDatabaseBuffers(Oid dbid);
 
 #define RelationGetNumberOfBlocks(reln) \
diff --git a/src/test/recovery/t/013_crash_restart.pl b/src/test/recovery/t/013_crash_restart.pl
index 9def8d2062..92e7b367df 100644
--- a/src/test/recovery/t/013_crash_restart.pl
+++ b/src/test/recovery/t/013_crash_restart.pl
@@ -86,24 +86,6 @@ ok( pump_until(
 $killme_stdout = '';
 $killme_stderr = '';
 
-#create a table that should *not* survive, but has rows.
-#the table's contents is requried to cause access to the storage file
-#after a restart.
-$killme_stdin .= q[
-CREATE TABLE not_alive AS SELECT 1 as a;
-SELECT pg_relation_filepath('not_alive');
-];
-ok( pump_until(
-		$killme,         $psql_timeout,
-		\$killme_stdout, qr/[[:alnum:]\/]+[\r\n]$/m),
-	'added in-creation table');
-my $not_alive_relfile = $node->data_dir . "/" . $killme_stdout;
-chomp($not_alive_relfile);
-$killme_stdout = '';
-$killme_stderr = '';
-
-# The relfile must be exists now
-ok ( -e $not_alive_relfile, 'relfile for in-creation table');
 
 # Start longrunning query in second session; its failure will signal that
 # crash-restart has occurred.  The initial wait for the trivial select is to
@@ -162,9 +144,6 @@ $killme->run();
 ($monitor_stdin, $monitor_stdout, $monitor_stderr) = ('', '', '');
 $monitor->run();
 
-# The relfile must have been removed due to the recent restart.
-ok ( ! -e $not_alive_relfile,
-	 'relfile for the in-creation table should be removed after restart');
 
 # Acquire pid of new backend
 $killme_stdin .= q[
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 747b7557dc..8dbbb09e8c 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -3886,6 +3886,7 @@ xl_replorigin_set
 xl_restore_point
 xl_running_xacts
 xl_seq_rec
+xl_smgr_bufpersistence
 xl_smgr_create
 xl_smgr_mark
 xl_smgr_truncate
-- 
2.31.1

>From 6d8b4d8d1a34e4093f6c16d288aad80482d9122d Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horikyota....@gmail.com>
Date: Wed, 15 Mar 2023 16:39:23 +0900
Subject: [PATCH v27 3/3] New command ALTER TABLE ALL IN TABLESPACE SET
 LOGGED/UNLOGGED

To ease invoking ALTER TABLE SET LOGGED/UNLOGGED, this command changes
relation persistence of all tables in the specified tablespace.
---
 doc/src/sgml/ref/alter_table.sgml        |  15 +++
 src/backend/catalog/storage.c            |   4 +-
 src/backend/commands/tablecmds.c         | 140 +++++++++++++++++++++++
 src/backend/parser/gram.y                |  42 +++++++
 src/backend/storage/buffer/bufmgr.c      |   2 +-
 src/backend/tcop/utility.c               |  11 ++
 src/include/catalog/storage_xlog.h       |   2 +-
 src/include/commands/tablecmds.h         |   2 +
 src/include/nodes/parsenodes.h           |  10 ++
 src/test/regress/expected/tablespace.out |  76 ++++++++++++
 src/test/regress/sql/tablespace.sql      |  41 +++++++
 11 files changed, 341 insertions(+), 4 deletions(-)

diff --git a/doc/src/sgml/ref/alter_table.sgml b/doc/src/sgml/ref/alter_table.sgml
index d4d93eeb7c..7ee09ca9cf 100644
--- a/doc/src/sgml/ref/alter_table.sgml
+++ b/doc/src/sgml/ref/alter_table.sgml
@@ -33,6 +33,8 @@ ALTER TABLE [ IF EXISTS ] <replaceable class="parameter">name</replaceable>
     SET SCHEMA <replaceable class="parameter">new_schema</replaceable>
 ALTER TABLE ALL IN TABLESPACE <replaceable class="parameter">name</replaceable> [ OWNED BY <replaceable class="parameter">role_name</replaceable> [, ... ] ]
     SET TABLESPACE <replaceable class="parameter">new_tablespace</replaceable> [ NOWAIT ]
+ALTER TABLE ALL IN TABLESPACE <replaceable class="parameter">name</replaceable> [ OWNED BY <replaceable class="parameter">role_name</replaceable> [, ... ] ]
+    SET { LOGGED | UNLOGGED } [ NOWAIT ]
 ALTER TABLE [ IF EXISTS ] <replaceable class="parameter">name</replaceable>
     ATTACH PARTITION <replaceable class="parameter">partition_name</replaceable> { FOR VALUES <replaceable class="parameter">partition_bound_spec</replaceable> | DEFAULT }
 ALTER TABLE [ IF EXISTS ] <replaceable class="parameter">name</replaceable>
@@ -769,6 +771,19 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM
       (for identity or serial columns).  However, it is also possible to
       change the persistence of such sequences separately.
      </para>
+     <para>
+      All tables in the current database in a tablespace can be changed by
+      using the <literal>ALL IN TABLESPACE</literal> form, which will first
+      lock all tables to be changed and then change each one.  This form also
+      supports
+      <literal>OWNED BY</literal>, which will only change tables owned by the
+      specified roles.  If the <literal>NOWAIT</literal> option is specified,
+      then the command will fail if it is unable to immediately acquire all of
+      the locks required.  The <literal>information_schema</literal> relations
+      are not considered part of the system catalogs and will be changed.  See
+      also
+      <link linkend="sql-createtablespace"><command>CREATE TABLESPACE</command></link>.
+     </para>
     </listitem>
    </varlistentry>
 
diff --git a/src/backend/catalog/storage.c b/src/backend/catalog/storage.c
index 97d1230ee8..38a88b1ccf 100644
--- a/src/backend/catalog/storage.c
+++ b/src/backend/catalog/storage.c
@@ -507,14 +507,14 @@ log_smgrunlinkmark(const RelFileLocator *rlocator, ForkNumber forkNum,
  * Perform XLogInsert of an XLOG_SMGR_BUFPERSISTENCE record to WAL.
  */
 void
-log_smgrbufpersistence(const RelFileLocator *rlocator, bool persistence)
+log_smgrbufpersistence(const RelFileLocator rlocator, bool persistence)
 {
 	xl_smgr_bufpersistence xlrec;
 
 	/*
 	 * Make an XLOG entry reporting the change of buffer persistence.
 	 */
-	xlrec.rlocator = *rlocator;
+	xlrec.rlocator = rlocator;
 	xlrec.persistence = persistence;
 
 	XLogBeginInsert();
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index becef96927..ab6ba6192d 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -14889,6 +14889,146 @@ AlterTableMoveAll(AlterTableMoveAllStmt *stmt)
 	return new_tablespaceoid;
 }
 
+/*
+ * Alter Table ALL ... SET LOGGED/UNLOGGED
+ *
+ * Allows a user to change persistence of all objects in a given tablespace in
+ * the current database.  Objects can be chosen based on the owner of the
+ * object also, to allow users to change persistence only their objects. The
+ * main permissions handling is done by the lower-level change persistence
+ * function.
+ *
+ * All to-be-modified objects are locked first. If NOWAIT is specified and the
+ * lock can't be acquired then we ereport(ERROR).
+ */
+void
+AlterTableSetLoggedAll(AlterTableSetLoggedAllStmt * stmt)
+{
+	List	   *relations = NIL;
+	ListCell   *l;
+	ScanKeyData key[1];
+	Relation	rel;
+	TableScanDesc scan;
+	HeapTuple	tuple;
+	Oid			tablespaceoid;
+	List	   *role_oids = roleSpecsToIds(stmt->roles);
+
+	/* Ensure we were not asked to change something we can't */
+	if (stmt->objtype != OBJECT_TABLE)
+		ereport(ERROR,
+				errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				errmsg("only tables can be specified"));
+
+	/* Get the tablespace OID */
+	tablespaceoid = get_tablespace_oid(stmt->tablespacename, false);
+
+	/*
+	 * Now that the checks are done, check if we should set either to
+	 * InvalidOid because it is our database's default tablespace.
+	 */
+	if (tablespaceoid == MyDatabaseTableSpace)
+		tablespaceoid = InvalidOid;
+
+	/*
+	 * Walk the list of objects in the tablespace to pick up them. This will
+	 * only find objects in our database, of course.
+	 */
+	ScanKeyInit(&key[0],
+				Anum_pg_class_reltablespace,
+				BTEqualStrategyNumber, F_OIDEQ,
+				ObjectIdGetDatum(tablespaceoid));
+
+	rel = table_open(RelationRelationId, AccessShareLock);
+	scan = table_beginscan_catalog(rel, 1, key);
+	while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+	{
+		Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
+		Oid			relOid = relForm->oid;
+
+		/*
+		 * Do not pick-up objects in pg_catalog as part of this, if an admin
+		 * really wishes to do so, they can issue the individual ALTER
+		 * commands directly.
+		 *
+		 * Also, explicitly avoid any shared tables, temp tables, or TOAST
+		 * (TOAST will be changed with the main table).
+		 */
+		if (IsCatalogNamespace(relForm->relnamespace) ||
+			relForm->relisshared ||
+			isAnyTempNamespace(relForm->relnamespace) ||
+			IsToastNamespace(relForm->relnamespace))
+			continue;
+
+		/* Only pick up the object type requested */
+		if (relForm->relkind != RELKIND_RELATION)
+			continue;
+
+		/* Check if we are only picking-up objects owned by certain roles */
+		if (role_oids != NIL && !list_member_oid(role_oids, relForm->relowner))
+			continue;
+
+		/*
+		 * Handle permissions-checking here since we are locking the tables
+		 * and also to avoid doing a bunch of work only to fail part-way. Note
+		 * that permissions will also be checked by AlterTableInternal().
+		 *
+		 * Caller must be considered an owner on the table of which we're
+		 * going to change persistence.
+		 */
+		if (!object_ownercheck(RelationRelationId, relOid, GetUserId()))
+			aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(get_rel_relkind(relOid)),
+						   NameStr(relForm->relname));
+
+		if (stmt->nowait &&
+			!ConditionalLockRelationOid(relOid, AccessExclusiveLock))
+			ereport(ERROR,
+					errcode(ERRCODE_OBJECT_IN_USE),
+					errmsg("aborting because lock on relation \"%s.%s\" is not available",
+						   get_namespace_name(relForm->relnamespace),
+						   NameStr(relForm->relname)));
+		else
+			LockRelationOid(relOid, AccessExclusiveLock);
+
+		/*
+		 * Add to our list of objects of which we're going to change
+		 * persistence.
+		 */
+		relations = lappend_oid(relations, relOid);
+	}
+
+	table_endscan(scan);
+	table_close(rel, AccessShareLock);
+
+	if (relations == NIL)
+		ereport(NOTICE,
+				errcode(ERRCODE_NO_DATA_FOUND),
+				errmsg("no matching relations in tablespace \"%s\" found",
+					   tablespaceoid == InvalidOid ? "(database default)" :
+					   get_tablespace_name(tablespaceoid)));
+
+	/*
+	 * Everything is locked, loop through and change persistence of all of the
+	 * relations.
+	 */
+	foreach(l, relations)
+	{
+		List	   *cmds = NIL;
+		AlterTableCmd *cmd = makeNode(AlterTableCmd);
+
+		if (stmt->logged)
+			cmd->subtype = AT_SetLogged;
+		else
+			cmd->subtype = AT_SetUnLogged;
+
+		cmds = lappend(cmds, cmd);
+
+		EventTriggerAlterTableStart((Node *) stmt);
+		/* OID is set by AlterTableInternal */
+		AlterTableInternal(lfirst_oid(l), cmds, false);
+		EventTriggerAlterTableEnd();
+	}
+}
+
 static void
 index_copy_data(Relation rel, RelFileLocator newrlocator)
 {
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index efe88ccf9d..1616130e01 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -2105,6 +2105,48 @@ AlterTableStmt:
 					n->nowait = $13;
 					$$ = (Node *) n;
 				}
+		|	ALTER TABLE ALL IN_P TABLESPACE name SET LOGGED opt_nowait
+				{
+					AlterTableSetLoggedAllStmt *n =
+						makeNode(AlterTableSetLoggedAllStmt);
+					n->tablespacename = $6;
+					n->objtype = OBJECT_TABLE;
+					n->logged = true;
+					n->nowait = $9;
+					$$ = (Node *)n;
+				}
+		|	ALTER TABLE ALL IN_P TABLESPACE name OWNED BY role_list SET LOGGED opt_nowait
+				{
+					AlterTableSetLoggedAllStmt *n =
+						makeNode(AlterTableSetLoggedAllStmt);
+					n->tablespacename = $6;
+					n->objtype = OBJECT_TABLE;
+					n->roles = $9;
+					n->logged = true;
+					n->nowait = $12;
+					$$ = (Node *)n;
+				}
+		|	ALTER TABLE ALL IN_P TABLESPACE name SET UNLOGGED opt_nowait
+				{
+					AlterTableSetLoggedAllStmt *n =
+						makeNode(AlterTableSetLoggedAllStmt);
+					n->tablespacename = $6;
+					n->objtype = OBJECT_TABLE;
+					n->logged = false;
+					n->nowait = $9;
+					$$ = (Node *)n;
+				}
+		|	ALTER TABLE ALL IN_P TABLESPACE name OWNED BY role_list SET UNLOGGED opt_nowait
+				{
+					AlterTableSetLoggedAllStmt *n =
+						makeNode(AlterTableSetLoggedAllStmt);
+					n->tablespacename = $6;
+					n->objtype = OBJECT_TABLE;
+					n->roles = $9;
+					n->logged = false;
+					n->nowait = $12;
+					$$ = (Node *)n;
+				}
 		|	ALTER INDEX qualified_name alter_table_cmds
 				{
 					AlterTableStmt *n = makeNode(AlterTableStmt);
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 2b00ec3eed..75d74caaba 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -3309,7 +3309,7 @@ SetRelationBuffersPersistence(SMgrRelation srel, bool permanent, bool isRedo)
 				PinBuffer_Locked(bufHdr);
 				LWLockAcquire(BufferDescriptorGetContentLock(bufHdr),
 							  LW_SHARED);
-				FlushBuffer(bufHdr, srel);
+				FlushBuffer(bufHdr, srel, IOOBJECT_RELATION, IOCONTEXT_NORMAL);
 				LWLockRelease(BufferDescriptorGetContentLock(bufHdr));
 				UnpinBuffer(bufHdr);
 			}
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index c7d9d96b45..1cbd86e3c1 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -164,6 +164,7 @@ ClassifyUtilityCommandAsReadOnly(Node *parsetree)
 		case T_AlterTSConfigurationStmt:
 		case T_AlterTSDictionaryStmt:
 		case T_AlterTableMoveAllStmt:
+		case T_AlterTableSetLoggedAllStmt:
 		case T_AlterTableSpaceOptionsStmt:
 		case T_AlterTableStmt:
 		case T_AlterTypeStmt:
@@ -1760,6 +1761,12 @@ ProcessUtilitySlow(ParseState *pstate,
 				commandCollected = true;
 				break;
 
+			case T_AlterTableSetLoggedAllStmt:
+				AlterTableSetLoggedAll((AlterTableSetLoggedAllStmt *) parsetree);
+				/* commands are stashed in AlterTableSetLoggedAll */
+				commandCollected = true;
+				break;
+
 			case T_DropStmt:
 				ExecDropStmt((DropStmt *) parsetree, isTopLevel);
 				/* no commands stashed for DROP */
@@ -2688,6 +2695,10 @@ CreateCommandTag(Node *parsetree)
 			tag = AlterObjectTypeCommandTag(((AlterTableMoveAllStmt *) parsetree)->objtype);
 			break;
 
+		case T_AlterTableSetLoggedAllStmt:
+			tag = AlterObjectTypeCommandTag(((AlterTableSetLoggedAllStmt *) parsetree)->objtype);
+			break;
+
 		case T_AlterTableStmt:
 			tag = AlterObjectTypeCommandTag(((AlterTableStmt *) parsetree)->objtype);
 			break;
diff --git a/src/include/catalog/storage_xlog.h b/src/include/catalog/storage_xlog.h
index 6e79c68f5b..847660b6af 100644
--- a/src/include/catalog/storage_xlog.h
+++ b/src/include/catalog/storage_xlog.h
@@ -88,7 +88,7 @@ extern void log_smgrcreatemark(const RelFileLocator *rlocator,
 							   ForkNumber forkNum, StorageMarks mark);
 extern void log_smgrunlinkmark(const RelFileLocator *rlocator,
 							   ForkNumber forkNum, StorageMarks mark);
-extern void log_smgrbufpersistence(const RelFileLocator *rlocator,
+extern void log_smgrbufpersistence(const RelFileLocator rlocator,
 								   bool persistence);
 
 extern void smgr_redo(XLogReaderState *record);
diff --git a/src/include/commands/tablecmds.h b/src/include/commands/tablecmds.h
index e7c2b91a58..6c0b60475a 100644
--- a/src/include/commands/tablecmds.h
+++ b/src/include/commands/tablecmds.h
@@ -42,6 +42,8 @@ extern void AlterTableInternal(Oid relid, List *cmds, bool recurse);
 
 extern Oid	AlterTableMoveAll(AlterTableMoveAllStmt *stmt);
 
+extern void AlterTableSetLoggedAll(AlterTableSetLoggedAllStmt * stmt);
+
 extern ObjectAddress AlterTableNamespace(AlterObjectSchemaStmt *stmt,
 										 Oid *oldschema);
 
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 028588fb33..217b26aeec 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -2537,6 +2537,16 @@ typedef struct AlterTableMoveAllStmt
 	bool		nowait;
 } AlterTableMoveAllStmt;
 
+typedef struct AlterTableSetLoggedAllStmt
+{
+	NodeTag		type;
+	char	   *tablespacename;
+	ObjectType	objtype;		/* Object type to move */
+	List	   *roles;			/* List of roles to change objects of */
+	bool		logged;
+	bool		nowait;
+}			AlterTableSetLoggedAllStmt;
+
 /* ----------------------
  *		Create/Alter Extension Statements
  * ----------------------
diff --git a/src/test/regress/expected/tablespace.out b/src/test/regress/expected/tablespace.out
index 9aabb85349..35b150b297 100644
--- a/src/test/regress/expected/tablespace.out
+++ b/src/test/regress/expected/tablespace.out
@@ -964,5 +964,81 @@ drop cascades to table testschema.part
 drop cascades to table testschema.atable
 drop cascades to materialized view testschema.amv
 drop cascades to table testschema.tablespace_acl
+--
+-- Check persistence change in a tablespace
+CREATE SCHEMA testschema;
+GRANT CREATE ON SCHEMA testschema TO regress_tablespace_user1;
+CREATE TABLESPACE regress_tablespace LOCATION '';
+GRANT CREATE ON TABLESPACE regress_tablespace TO regress_tablespace_user1;
+CREATE TABLE testschema.lsu(a int) TABLESPACE regress_tablespace;
+CREATE UNLOGGED TABLE testschema.usu(a int) TABLESPACE regress_tablespace;
+CREATE TABLE testschema._lsu(a int) TABLESPACE pg_default;
+CREATE UNLOGGED TABLE testschema._usu(a int) TABLESPACE pg_default;
+SET ROLE regress_tablespace_user1;
+CREATE TABLE testschema.lu1(a int) TABLESPACE regress_tablespace;
+CREATE UNLOGGED TABLE testschema.uu1(a int) TABLESPACE regress_tablespace;
+CREATE TABLE testschema._lu1(a int) TABLESPACE pg_default;
+CREATE UNLOGGED TABLE testschema._uu1(a int) TABLESPACE pg_default;
+SELECT relname, t.spcname, relpersistence
+ FROM pg_class c LEFT JOIN pg_tablespace t ON (c.reltablespace = t.oid)
+ WHERE relnamespace = 'testschema'::regnamespace ORDER BY spcname, c.oid;
+ relname |      spcname       | relpersistence 
+---------+--------------------+----------------
+ lsu     | regress_tablespace | p
+ usu     | regress_tablespace | u
+ lu1     | regress_tablespace | p
+ uu1     | regress_tablespace | u
+ _lsu    |                    | p
+ _usu    |                    | u
+ _lu1    |                    | p
+ _uu1    |                    | u
+(8 rows)
+
+ALTER TABLE ALL IN TABLESPACE regress_tablespace
+	  OWNED BY regress_tablespace_user1 SET LOGGED;
+SELECT relname, t.spcname, relpersistence
+ FROM pg_class c LEFT JOIN pg_tablespace t ON (c.reltablespace = t.oid)
+ WHERE relnamespace = 'testschema'::regnamespace ORDER BY spcname, c.oid;
+ relname |      spcname       | relpersistence 
+---------+--------------------+----------------
+ lsu     | regress_tablespace | p
+ usu     | regress_tablespace | u
+ lu1     | regress_tablespace | p
+ uu1     | regress_tablespace | p
+ _lsu    |                    | p
+ _usu    |                    | u
+ _lu1    |                    | p
+ _uu1    |                    | u
+(8 rows)
+
+RESET ROLE;
+ALTER TABLE ALL IN TABLESPACE regress_tablespace SET UNLOGGED;
+SELECT relname, t.spcname, relpersistence
+ FROM pg_class c LEFT JOIN pg_tablespace t ON (c.reltablespace = t.oid)
+ WHERE relnamespace = 'testschema'::regnamespace ORDER BY spcname, c.oid;
+ relname |      spcname       | relpersistence 
+---------+--------------------+----------------
+ lsu     | regress_tablespace | u
+ usu     | regress_tablespace | u
+ lu1     | regress_tablespace | u
+ uu1     | regress_tablespace | u
+ _lsu    |                    | p
+ _usu    |                    | u
+ _lu1    |                    | p
+ _uu1    |                    | u
+(8 rows)
+
+-- Should succeed
+DROP SCHEMA testschema CASCADE;
+NOTICE:  drop cascades to 8 other objects
+DETAIL:  drop cascades to table testschema.lsu
+drop cascades to table testschema.usu
+drop cascades to table testschema._lsu
+drop cascades to table testschema._usu
+drop cascades to table testschema.lu1
+drop cascades to table testschema.uu1
+drop cascades to table testschema._lu1
+drop cascades to table testschema._uu1
+DROP TABLESPACE regress_tablespace;
 DROP ROLE regress_tablespace_user1;
 DROP ROLE regress_tablespace_user2;
diff --git a/src/test/regress/sql/tablespace.sql b/src/test/regress/sql/tablespace.sql
index d274d9615e..eb8e247a1d 100644
--- a/src/test/regress/sql/tablespace.sql
+++ b/src/test/regress/sql/tablespace.sql
@@ -429,5 +429,46 @@ DROP TABLESPACE regress_tblspace_renamed;
 
 DROP SCHEMA testschema CASCADE;
 
+
+--
+-- Check persistence change in a tablespace
+CREATE SCHEMA testschema;
+GRANT CREATE ON SCHEMA testschema TO regress_tablespace_user1;
+CREATE TABLESPACE regress_tablespace LOCATION '';
+GRANT CREATE ON TABLESPACE regress_tablespace TO regress_tablespace_user1;
+
+CREATE TABLE testschema.lsu(a int) TABLESPACE regress_tablespace;
+CREATE UNLOGGED TABLE testschema.usu(a int) TABLESPACE regress_tablespace;
+CREATE TABLE testschema._lsu(a int) TABLESPACE pg_default;
+CREATE UNLOGGED TABLE testschema._usu(a int) TABLESPACE pg_default;
+SET ROLE regress_tablespace_user1;
+CREATE TABLE testschema.lu1(a int) TABLESPACE regress_tablespace;
+CREATE UNLOGGED TABLE testschema.uu1(a int) TABLESPACE regress_tablespace;
+CREATE TABLE testschema._lu1(a int) TABLESPACE pg_default;
+CREATE UNLOGGED TABLE testschema._uu1(a int) TABLESPACE pg_default;
+
+SELECT relname, t.spcname, relpersistence
+ FROM pg_class c LEFT JOIN pg_tablespace t ON (c.reltablespace = t.oid)
+ WHERE relnamespace = 'testschema'::regnamespace ORDER BY spcname, c.oid;
+
+ALTER TABLE ALL IN TABLESPACE regress_tablespace
+	  OWNED BY regress_tablespace_user1 SET LOGGED;
+
+SELECT relname, t.spcname, relpersistence
+ FROM pg_class c LEFT JOIN pg_tablespace t ON (c.reltablespace = t.oid)
+ WHERE relnamespace = 'testschema'::regnamespace ORDER BY spcname, c.oid;
+
+RESET ROLE;
+
+ALTER TABLE ALL IN TABLESPACE regress_tablespace SET UNLOGGED;
+
+SELECT relname, t.spcname, relpersistence
+ FROM pg_class c LEFT JOIN pg_tablespace t ON (c.reltablespace = t.oid)
+ WHERE relnamespace = 'testschema'::regnamespace ORDER BY spcname, c.oid;
+
+-- Should succeed
+DROP SCHEMA testschema CASCADE;
+DROP TABLESPACE regress_tablespace;
+
 DROP ROLE regress_tablespace_user1;
 DROP ROLE regress_tablespace_user2;
-- 
2.31.1

Reply via email to