Here is another rebased patch set.

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com
>From c11a6893d2d509df1389a1c03081b6cc563d5683 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <bossa...@amazon.com>
Date: Wed, 5 Jan 2022 19:24:22 +0000
Subject: [PATCH v5 1/8] Introduce custodian.

The custodian process is a new auxiliary process that is intended
to help offload tasks could otherwise delay startup and
checkpointing.  This commit simply adds the new process; it does
not yet do anything useful.
---
 src/backend/postmaster/Makefile         |   1 +
 src/backend/postmaster/auxprocess.c     |   8 +
 src/backend/postmaster/custodian.c      | 214 ++++++++++++++++++++++++
 src/backend/postmaster/postmaster.c     |  44 ++++-
 src/backend/storage/lmgr/proc.c         |   1 +
 src/backend/utils/activity/wait_event.c |   3 +
 src/backend/utils/init/miscinit.c       |   3 +
 src/include/miscadmin.h                 |   3 +
 src/include/postmaster/custodian.h      |  17 ++
 src/include/storage/proc.h              |  11 +-
 src/include/utils/wait_event.h          |   1 +
 11 files changed, 301 insertions(+), 5 deletions(-)
 create mode 100644 src/backend/postmaster/custodian.c
 create mode 100644 src/include/postmaster/custodian.h

diff --git a/src/backend/postmaster/Makefile b/src/backend/postmaster/Makefile
index dbbeac5a82..1b7aae60f5 100644
--- a/src/backend/postmaster/Makefile
+++ b/src/backend/postmaster/Makefile
@@ -18,6 +18,7 @@ OBJS = \
 	bgworker.o \
 	bgwriter.o \
 	checkpointer.o \
+	custodian.o \
 	fork_process.o \
 	interrupt.o \
 	pgarch.o \
diff --git a/src/backend/postmaster/auxprocess.c b/src/backend/postmaster/auxprocess.c
index 0587e45920..7eae34884d 100644
--- a/src/backend/postmaster/auxprocess.c
+++ b/src/backend/postmaster/auxprocess.c
@@ -20,6 +20,7 @@
 #include "pgstat.h"
 #include "postmaster/auxprocess.h"
 #include "postmaster/bgwriter.h"
+#include "postmaster/custodian.h"
 #include "postmaster/startup.h"
 #include "postmaster/walwriter.h"
 #include "replication/walreceiver.h"
@@ -74,6 +75,9 @@ AuxiliaryProcessMain(AuxProcType auxtype)
 		case CheckpointerProcess:
 			MyBackendType = B_CHECKPOINTER;
 			break;
+		case CustodianProcess:
+			MyBackendType = B_CUSTODIAN;
+			break;
 		case WalWriterProcess:
 			MyBackendType = B_WAL_WRITER;
 			break;
@@ -153,6 +157,10 @@ AuxiliaryProcessMain(AuxProcType auxtype)
 			CheckpointerMain();
 			proc_exit(1);
 
+		case CustodianProcess:
+			CustodianMain();
+			proc_exit(1);
+
 		case WalWriterProcess:
 			WalWriterMain();
 			proc_exit(1);
diff --git a/src/backend/postmaster/custodian.c b/src/backend/postmaster/custodian.c
new file mode 100644
index 0000000000..5f2b647544
--- /dev/null
+++ b/src/backend/postmaster/custodian.c
@@ -0,0 +1,214 @@
+/*-------------------------------------------------------------------------
+ *
+ * custodian.c
+ *
+ * The custodian process is new as of Postgres 15.  It's main purpose is to
+ * offload tasks that could otherwise delay startup and checkpointing, but
+ * it needn't be restricted to just those things.  Offloaded tasks should
+ * not be synchronous (e.g., checkpointing shouldn't need to wait for the
+ * custodian to complete a task before proceeding).  Also, ensure that any
+ * offloaded tasks are either not required during single-user mode or are
+ * performed separately during single-user mode.
+ *
+ * The custodian is not an essential process and can shutdown quickly when
+ * requested.  The custodian will wake up approximately once every 5
+ * minutes to perform its tasks, but backends can (and should) set its
+ * latch to wake it up sooner.
+ *
+ * Normal termination is by SIGTERM, which instructs the bgwriter to
+ * exit(0).  Emergency termination is by SIGQUIT; like any backend, the
+ * custodian will simply abort and exit on SIGQUIT.
+ *
+ * If the custodian exits unexpectedly, the postmaster treats that the same
+ * as a backend crash: shared memory may be corrupted, so remaining
+ * backends should be killed by SIGQUIT and then a recovery cycle started.
+ *
+ *
+ * Copyright (c) 2022, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ *   src/backend/postmaster/custodian.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <time.h>
+
+#include "libpq/pqsignal.h"
+#include "pgstat.h"
+#include "postmaster/custodian.h"
+#include "postmaster/interrupt.h"
+#include "storage/bufmgr.h"
+#include "storage/condition_variable.h"
+#include "storage/fd.h"
+#include "storage/proc.h"
+#include "storage/procsignal.h"
+#include "utils/memutils.h"
+
+#define CUSTODIAN_TIMEOUT_S (300)		/* 5 minutes */
+
+/*
+ * Main entry point for custodian process
+ *
+ * This is invoked from AuxiliaryProcessMain, which has already created the
+ * basic execution environment, but not enabled signals yet.
+ */
+void
+CustodianMain(void)
+{
+	sigjmp_buf	local_sigjmp_buf;
+	MemoryContext custodian_context;
+
+	/*
+	 * Properly accept or ignore signals that might be sent to us.
+	 */
+	pqsignal(SIGHUP, SignalHandlerForConfigReload);
+	pqsignal(SIGINT, SignalHandlerForShutdownRequest);
+	pqsignal(SIGTERM, SignalHandlerForShutdownRequest);
+	/* SIGQUIT handler was already set up by InitPostmasterChild */
+	pqsignal(SIGALRM, SIG_IGN);
+	pqsignal(SIGPIPE, SIG_IGN);
+	pqsignal(SIGUSR1, procsignal_sigusr1_handler);
+	pqsignal(SIGUSR2, SIG_IGN);
+
+	/*
+	 * Reset some signals that are accepted by postmaster but not here
+	 */
+	pqsignal(SIGCHLD, SIG_DFL);
+
+	/*
+	 * Create a memory context that we will do all our work in.  We do this so
+	 * that we can reset the context during error recovery and thereby avoid
+	 * possible memory leaks.
+	 */
+	custodian_context = AllocSetContextCreate(TopMemoryContext,
+											  "Custodian",
+											  ALLOCSET_DEFAULT_SIZES);
+	MemoryContextSwitchTo(custodian_context);
+
+	/*
+	 * If an exception is encountered, processing resumes here.
+	 *
+	 * You might wonder why this isn't coded as an infinite loop around a
+	 * PG_TRY construct.  The reason is that this is the bottom of the
+	 * exception stack, and so with PG_TRY there would be no exception handler
+	 * in force at all during the CATCH part.  By leaving the outermost setjmp
+	 * always active, we have at least some chance of recovering from an error
+	 * during error recovery.  (If we get into an infinite loop thereby, it
+	 * will soon be stopped by overflow of elog.c's internal state stack.)
+	 *
+	 * Note that we use sigsetjmp(..., 1), so that the prevailing signal mask
+	 * (to wit, BlockSig) will be restored when longjmp'ing to here.  Thus,
+	 * signals other than SIGQUIT will be blocked until we complete error
+	 * recovery.  It might seem that this policy makes the HOLD_INTERRUPS()
+	 * call redundant, but it is not since InterruptPending might be set
+	 * already.
+	 */
+	if (sigsetjmp(local_sigjmp_buf, 1) != 0)
+	{
+		/* Since not using PG_TRY, must reset error stack by hand */
+		error_context_stack = NULL;
+
+		/* Prevent interrupts while cleaning up */
+		HOLD_INTERRUPTS();
+
+		/* Report the error to the server log */
+		EmitErrorReport();
+
+		/*
+		 * These operations are really just a minimal subset of
+		 * AbortTransaction().  We don't have very many resources to worry
+		 * about.
+		 */
+		LWLockReleaseAll();
+		ConditionVariableCancelSleep();
+		pgstat_report_wait_end();
+		AbortBufferIO();
+		UnlockBuffers();
+		ReleaseAuxProcessResources(false);
+		AtEOXact_Buffers(false);
+		AtEOXact_SMgr();
+		AtEOXact_Files(false);
+		AtEOXact_HashTables(false);
+
+		/*
+		 * Now return to normal top-level context and clear ErrorContext for
+		 * next time.
+		 */
+		MemoryContextSwitchTo(custodian_context);
+		FlushErrorState();
+
+		/* Flush any leaked data in the top-level context */
+		MemoryContextResetAndDeleteChildren(custodian_context);
+
+		/* Now we can allow interrupts again */
+		RESUME_INTERRUPTS();
+
+		/*
+		 * Sleep at least 1 second after any error.  A write error is likely
+		 * to be repeated, and we don't want to be filling the error logs as
+		 * fast as we can.
+		 */
+		pg_usleep(1000000L);
+
+		/*
+		 * Close all open files after any error.  This is helpful on Windows,
+		 * where holding deleted files open causes various strange errors.
+		 * It's not clear we need it elsewhere, but shouldn't hurt.
+		 */
+		smgrcloseall();
+
+		/* Report wait end here, when there is no further possibility of wait */
+		pgstat_report_wait_end();
+	}
+
+	/* We can now handle ereport(ERROR) */
+	PG_exception_stack = &local_sigjmp_buf;
+
+	/*
+	 * Unblock signals (they were blocked when the postmaster forked us)
+	 */
+	PG_SETMASK(&UnBlockSig);
+
+	/*
+	 * Advertise out latch that backends can use to wake us up while we're
+	 * sleeping.
+	 */
+	ProcGlobal->custodianLatch = &MyProc->procLatch;
+
+	/*
+	 * Loop forever
+	 */
+	for (;;)
+	{
+		pg_time_t	start_time;
+		pg_time_t	end_time;
+		int			elapsed_secs;
+		int			cur_timeout;
+
+		/* Clear any already-pending wakeups */
+		ResetLatch(MyLatch);
+
+		HandleMainLoopInterrupts();
+
+		start_time = (pg_time_t) time(NULL);
+
+		/* TODO: offloaded tasks go here */
+
+		/* Calculate how long to sleep */
+		end_time = (pg_time_t) time(NULL);
+		elapsed_secs = end_time - start_time;
+		if (elapsed_secs >= CUSTODIAN_TIMEOUT_S)
+			continue;			/* no sleep for us */
+		cur_timeout = CUSTODIAN_TIMEOUT_S - elapsed_secs;
+
+		(void) WaitLatch(MyLatch,
+						 WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+						 cur_timeout * 1000L /* convert to ms */ ,
+						 WAIT_EVENT_CUSTODIAN_MAIN);
+	}
+
+	pg_unreachable();
+}
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index 735fed490b..a867412268 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -251,6 +251,7 @@ bool		remove_temp_files_after_crash = true;
 static pid_t StartupPID = 0,
 			BgWriterPID = 0,
 			CheckpointerPID = 0,
+			CustodianPID = 0,
 			WalWriterPID = 0,
 			WalReceiverPID = 0,
 			AutoVacPID = 0,
@@ -557,6 +558,7 @@ static void ShmemBackendArrayRemove(Backend *bn);
 #define StartArchiver()			StartChildProcess(ArchiverProcess)
 #define StartBackgroundWriter() StartChildProcess(BgWriterProcess)
 #define StartCheckpointer()		StartChildProcess(CheckpointerProcess)
+#define StartCustodian()		StartChildProcess(CustodianProcess)
 #define StartWalWriter()		StartChildProcess(WalWriterProcess)
 #define StartWalReceiver()		StartChildProcess(WalReceiverProcess)
 
@@ -1818,13 +1820,16 @@ ServerLoop(void)
 		/*
 		 * If no background writer process is running, and we are not in a
 		 * state that prevents it, start one.  It doesn't matter if this
-		 * fails, we'll just try again later.  Likewise for the checkpointer.
+		 * fails, we'll just try again later.  Likewise for the checkpointer
+		 * and custodian.
 		 */
 		if (pmState == PM_RUN || pmState == PM_RECOVERY ||
 			pmState == PM_HOT_STANDBY || pmState == PM_STARTUP)
 		{
 			if (CheckpointerPID == 0)
 				CheckpointerPID = StartCheckpointer();
+			if (CustodianPID == 0)
+				CustodianPID = StartCustodian();
 			if (BgWriterPID == 0)
 				BgWriterPID = StartBackgroundWriter();
 		}
@@ -2781,6 +2786,8 @@ SIGHUP_handler(SIGNAL_ARGS)
 			signal_child(BgWriterPID, SIGHUP);
 		if (CheckpointerPID != 0)
 			signal_child(CheckpointerPID, SIGHUP);
+		if (CustodianPID != 0)
+			signal_child(CustodianPID, SIGHUP);
 		if (WalWriterPID != 0)
 			signal_child(WalWriterPID, SIGHUP);
 		if (WalReceiverPID != 0)
@@ -3108,6 +3115,8 @@ reaper(SIGNAL_ARGS)
 			 */
 			if (CheckpointerPID == 0)
 				CheckpointerPID = StartCheckpointer();
+			if (CustodianPID == 0)
+				CustodianPID = StartCustodian();
 			if (BgWriterPID == 0)
 				BgWriterPID = StartBackgroundWriter();
 			if (WalWriterPID == 0)
@@ -3210,6 +3219,20 @@ reaper(SIGNAL_ARGS)
 			continue;
 		}
 
+		/*
+		 * Was it the custodian?  Normal exit can be ignored; we'll start a
+		 * new one at the next iteration of the postmaster's main loop, if
+		 * necessary.  Any other exit condition is treated as a crash.
+		 */
+		if (pid == CustodianPID)
+		{
+			CustodianPID = 0;
+			if (!EXIT_STATUS_0(exitstatus))
+				HandleChildCrash(pid, exitstatus,
+								 _("custodian process"));
+			continue;
+		}
+
 		/*
 		 * Was it the wal writer?  Normal exit can be ignored; we'll start a
 		 * new one at the next iteration of the postmaster's main loop, if
@@ -3683,6 +3706,18 @@ HandleChildCrash(int pid, int exitstatus, const char *procname)
 		signal_child(CheckpointerPID, (SendStop ? SIGSTOP : SIGQUIT));
 	}
 
+	/* Take care of the custodian too */
+	if (pid == CustodianPID)
+		CustodianPID = 0;
+	else if (CustodianPID != 0 && take_action)
+	{
+		ereport(DEBUG2,
+				(errmsg_internal("sending %s to process %d",
+								 (SendStop ? "SIGSTOP" : "SIGQUIT"),
+								 (int) CustodianPID)));
+		signal_child(CustodianPID, (SendStop ? SIGSTOP : SIGQUIT));
+	}
+
 	/* Take care of the walwriter too */
 	if (pid == WalWriterPID)
 		WalWriterPID = 0;
@@ -3886,6 +3921,9 @@ PostmasterStateMachine(void)
 		/* and the bgwriter too */
 		if (BgWriterPID != 0)
 			signal_child(BgWriterPID, SIGTERM);
+		/* and the custodian too */
+		if (CustodianPID != 0)
+			signal_child(CustodianPID, SIGTERM);
 		/* and the walwriter too */
 		if (WalWriterPID != 0)
 			signal_child(WalWriterPID, SIGTERM);
@@ -3923,6 +3961,7 @@ PostmasterStateMachine(void)
 			BgWriterPID == 0 &&
 			(CheckpointerPID == 0 ||
 			 (!FatalError && Shutdown < ImmediateShutdown)) &&
+			CustodianPID == 0 &&
 			WalWriterPID == 0 &&
 			AutoVacPID == 0)
 		{
@@ -4016,6 +4055,7 @@ PostmasterStateMachine(void)
 			Assert(WalReceiverPID == 0);
 			Assert(BgWriterPID == 0);
 			Assert(CheckpointerPID == 0);
+			Assert(CustodianPID == 0);
 			Assert(WalWriterPID == 0);
 			Assert(AutoVacPID == 0);
 			/* syslogger is not considered here */
@@ -4221,6 +4261,8 @@ TerminateChildren(int signal)
 		signal_child(BgWriterPID, signal);
 	if (CheckpointerPID != 0)
 		signal_child(CheckpointerPID, signal);
+	if (CustodianPID != 0)
+		signal_child(CustodianPID, signal);
 	if (WalWriterPID != 0)
 		signal_child(WalWriterPID, signal);
 	if (WalReceiverPID != 0)
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 90283f8a9f..1e693b69e5 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -181,6 +181,7 @@ InitProcGlobal(void)
 	ProcGlobal->startupBufferPinWaitBufId = -1;
 	ProcGlobal->walwriterLatch = NULL;
 	ProcGlobal->checkpointerLatch = NULL;
+	ProcGlobal->custodianLatch = NULL;
 	pg_atomic_init_u32(&ProcGlobal->procArrayGroupFirst, INVALID_PGPROCNO);
 	pg_atomic_init_u32(&ProcGlobal->clogGroupFirst, INVALID_PGPROCNO);
 
diff --git a/src/backend/utils/activity/wait_event.c b/src/backend/utils/activity/wait_event.c
index 60972c3a75..e10cc2d82b 100644
--- a/src/backend/utils/activity/wait_event.c
+++ b/src/backend/utils/activity/wait_event.c
@@ -224,6 +224,9 @@ pgstat_get_wait_activity(WaitEventActivity w)
 		case WAIT_EVENT_CHECKPOINTER_MAIN:
 			event_name = "CheckpointerMain";
 			break;
+		case WAIT_EVENT_CUSTODIAN_MAIN:
+			event_name = "CustodianMain";
+			break;
 		case WAIT_EVENT_LOGICAL_APPLY_MAIN:
 			event_name = "LogicalApplyMain";
 			break;
diff --git a/src/backend/utils/init/miscinit.c b/src/backend/utils/init/miscinit.c
index 0868e5a24f..8b52757ea6 100644
--- a/src/backend/utils/init/miscinit.c
+++ b/src/backend/utils/init/miscinit.c
@@ -274,6 +274,9 @@ GetBackendTypeDesc(BackendType backendType)
 		case B_CHECKPOINTER:
 			backendDesc = "checkpointer";
 			break;
+		case B_CUSTODIAN:
+			backendDesc = "custodian";
+			break;
 		case B_STARTUP:
 			backendDesc = "startup";
 			break;
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
index 0abc3ad540..71f522878e 100644
--- a/src/include/miscadmin.h
+++ b/src/include/miscadmin.h
@@ -328,6 +328,7 @@ typedef enum BackendType
 	B_BG_WORKER,
 	B_BG_WRITER,
 	B_CHECKPOINTER,
+	B_CUSTODIAN,
 	B_STARTUP,
 	B_WAL_RECEIVER,
 	B_WAL_SENDER,
@@ -432,6 +433,7 @@ typedef enum
 	BgWriterProcess,
 	ArchiverProcess,
 	CheckpointerProcess,
+	CustodianProcess,
 	WalWriterProcess,
 	WalReceiverProcess,
 
@@ -444,6 +446,7 @@ extern AuxProcType MyAuxProcType;
 #define AmBackgroundWriterProcess() (MyAuxProcType == BgWriterProcess)
 #define AmArchiverProcess()			(MyAuxProcType == ArchiverProcess)
 #define AmCheckpointerProcess()		(MyAuxProcType == CheckpointerProcess)
+#define AmCustodianProcess()		(MyAuxProcType == CustodianProcess)
 #define AmWalWriterProcess()		(MyAuxProcType == WalWriterProcess)
 #define AmWalReceiverProcess()		(MyAuxProcType == WalReceiverProcess)
 
diff --git a/src/include/postmaster/custodian.h b/src/include/postmaster/custodian.h
new file mode 100644
index 0000000000..cf0a04ca6c
--- /dev/null
+++ b/src/include/postmaster/custodian.h
@@ -0,0 +1,17 @@
+/*-------------------------------------------------------------------------
+ *
+ * custodian.h
+ *   Exports from postmaster/custodian.c.
+ *
+ * Copyright (c) 2022, PostgreSQL Global Development Group
+ *
+ * src/include/postmaster/custodian.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef _CUSTODIAN_H
+#define _CUSTODIAN_H
+
+extern void CustodianMain(void) pg_attribute_noreturn();
+
+#endif						/* _CUSTODIAN_H */
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index a58888f9e9..ad61b4d802 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -357,6 +357,8 @@ typedef struct PROC_HDR
 	Latch	   *walwriterLatch;
 	/* Checkpointer process's latch */
 	Latch	   *checkpointerLatch;
+	/* Custodian process's latch */
+	Latch	   *custodianLatch;
 	/* Current shared estimate of appropriate spins_per_delay value */
 	int			spins_per_delay;
 	/* Buffer id of the buffer that Startup process waits for pin on, or -1 */
@@ -374,11 +376,12 @@ extern PGPROC *PreparedXactProcs;
  * We set aside some extra PGPROC structures for auxiliary processes,
  * ie things that aren't full-fledged backends but need shmem access.
  *
- * Background writer, checkpointer, WAL writer and archiver run during normal
- * operation.  Startup process and WAL receiver also consume 2 slots, but WAL
- * writer is launched only after startup has exited, so we only need 5 slots.
+ * Background writer, checkpointer, custodian, WAL writer and archiver run
+ * during normal operation.  Startup process and WAL receiver also consume 2
+ * slots, but WAL writer is launched only after startup has exited, so we only
+ * need 6 slots.
  */
-#define NUM_AUXILIARY_PROCS		5
+#define NUM_AUXILIARY_PROCS		6
 
 /* configurable options */
 extern PGDLLIMPORT int DeadlockTimeout;
diff --git a/src/include/utils/wait_event.h b/src/include/utils/wait_event.h
index 395d325c5f..1338d06823 100644
--- a/src/include/utils/wait_event.h
+++ b/src/include/utils/wait_event.h
@@ -40,6 +40,7 @@ typedef enum
 	WAIT_EVENT_BGWRITER_HIBERNATE,
 	WAIT_EVENT_BGWRITER_MAIN,
 	WAIT_EVENT_CHECKPOINTER_MAIN,
+	WAIT_EVENT_CUSTODIAN_MAIN,
 	WAIT_EVENT_LOGICAL_APPLY_MAIN,
 	WAIT_EVENT_LOGICAL_LAUNCHER_MAIN,
 	WAIT_EVENT_PGSTAT_MAIN,
-- 
2.25.1

>From d9826f75ad2259984d55fc04622f0b91ebbba65a Mon Sep 17 00:00:00 2001
From: Nathan Bossart <bossa...@amazon.com>
Date: Sun, 5 Dec 2021 19:38:20 -0800
Subject: [PATCH v5 2/8] Also remove pgsql_tmp directories during startup.

Presently, the server only removes the contents of the temporary
directories during startup, not the directory itself.  This changes
that to prepare for future commits that will move temporary file
cleanup to a separate auxiliary process.
---
 src/backend/postmaster/postmaster.c         |  2 +-
 src/backend/storage/file/fd.c               | 20 ++++++++++----------
 src/include/storage/fd.h                    |  4 ++--
 src/test/recovery/t/022_crash_temp_files.pl |  6 ++++--
 4 files changed, 17 insertions(+), 15 deletions(-)

diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index a867412268..54f77cb306 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -1116,7 +1116,7 @@ PostmasterMain(int argc, char *argv[])
 	 * safe to do so now, because we verified earlier that there are no
 	 * conflicting Postgres processes in this data directory.
 	 */
-	RemovePgTempFilesInDir(PG_TEMP_FILES_DIR, true, false);
+	RemovePgTempDir(PG_TEMP_FILES_DIR, true, false);
 #endif
 
 	/*
diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c
index 14b77f2861..35cb6f7bb6 100644
--- a/src/backend/storage/file/fd.c
+++ b/src/backend/storage/file/fd.c
@@ -3160,7 +3160,7 @@ RemovePgTempFiles(void)
 	 * First process temp files in pg_default ($PGDATA/base)
 	 */
 	snprintf(temp_path, sizeof(temp_path), "base/%s", PG_TEMP_FILES_DIR);
-	RemovePgTempFilesInDir(temp_path, true, false);
+	RemovePgTempDir(temp_path, true, false);
 	RemovePgTempRelationFiles("base");
 
 	/*
@@ -3176,7 +3176,7 @@ RemovePgTempFiles(void)
 
 		snprintf(temp_path, sizeof(temp_path), "pg_tblspc/%s/%s/%s",
 				 spc_de->d_name, TABLESPACE_VERSION_DIRECTORY, PG_TEMP_FILES_DIR);
-		RemovePgTempFilesInDir(temp_path, true, false);
+		RemovePgTempDir(temp_path, true, false);
 
 		snprintf(temp_path, sizeof(temp_path), "pg_tblspc/%s/%s",
 				 spc_de->d_name, TABLESPACE_VERSION_DIRECTORY);
@@ -3209,7 +3209,7 @@ RemovePgTempFiles(void)
  * them separate.)
  */
 void
-RemovePgTempFilesInDir(const char *tmpdirname, bool missing_ok, bool unlink_all)
+RemovePgTempDir(const char *tmpdirname, bool missing_ok, bool unlink_all)
 {
 	DIR		   *temp_dir;
 	struct dirent *temp_de;
@@ -3247,13 +3247,7 @@ RemovePgTempFilesInDir(const char *tmpdirname, bool missing_ok, bool unlink_all)
 			if (S_ISDIR(statbuf.st_mode))
 			{
 				/* recursively remove contents, then directory itself */
-				RemovePgTempFilesInDir(rm_path, false, true);
-
-				if (rmdir(rm_path) < 0)
-					ereport(LOG,
-							(errcode_for_file_access(),
-							 errmsg("could not remove directory \"%s\": %m",
-									rm_path)));
+				RemovePgTempDir(rm_path, false, true);
 			}
 			else
 			{
@@ -3271,6 +3265,12 @@ RemovePgTempFilesInDir(const char *tmpdirname, bool missing_ok, bool unlink_all)
 	}
 
 	FreeDir(temp_dir);
+
+	if (rmdir(tmpdirname) < 0)
+		ereport(LOG,
+				(errcode_for_file_access(),
+				 errmsg("could not remove directory \"%s\": %m",
+						tmpdirname)));
 }
 
 /* Process one tablespace directory, look for per-DB subdirectories */
diff --git a/src/include/storage/fd.h b/src/include/storage/fd.h
index 29209e2724..525847daea 100644
--- a/src/include/storage/fd.h
+++ b/src/include/storage/fd.h
@@ -169,8 +169,8 @@ extern void AtEOXact_Files(bool isCommit);
 extern void AtEOSubXact_Files(bool isCommit, SubTransactionId mySubid,
 							  SubTransactionId parentSubid);
 extern void RemovePgTempFiles(void);
-extern void RemovePgTempFilesInDir(const char *tmpdirname, bool missing_ok,
-								   bool unlink_all);
+extern void RemovePgTempDir(const char *tmpdirname, bool missing_ok,
+							bool unlink_all);
 extern bool looks_like_temp_rel_name(const char *name);
 
 extern int	pg_fsync(int fd);
diff --git a/src/test/recovery/t/022_crash_temp_files.pl b/src/test/recovery/t/022_crash_temp_files.pl
index 6ab3092874..fd93368434 100644
--- a/src/test/recovery/t/022_crash_temp_files.pl
+++ b/src/test/recovery/t/022_crash_temp_files.pl
@@ -138,7 +138,8 @@ $node->poll_query_until('postgres', undef, '');
 
 # Check for temporary files
 is( $node->safe_psql(
-		'postgres', 'SELECT COUNT(1) FROM pg_ls_dir($$base/pgsql_tmp$$)'),
+		'postgres',
+		'SELECT COUNT(1) FROM pg_ls_dir($$base$$) WHERE pg_ls_dir = \'pgsql_tmp\''),
 	qq(0),
 	'no temporary files');
 
@@ -236,7 +237,8 @@ $node->restart();
 
 # Check the temporary files -- should be gone
 is( $node->safe_psql(
-		'postgres', 'SELECT COUNT(1) FROM pg_ls_dir($$base/pgsql_tmp$$)'),
+		'postgres',
+		'SELECT COUNT(1) FROM pg_ls_dir($$base$$) WHERE pg_ls_dir = \'pgsql_tmp\''),
 	qq(0),
 	'temporary file was removed');
 
-- 
2.25.1

>From 72baf897a3c733b30bb4bf63e63825b9bc6a6acf Mon Sep 17 00:00:00 2001
From: Nathan Bossart <bossa...@amazon.com>
Date: Sun, 5 Dec 2021 21:16:44 -0800
Subject: [PATCH v5 3/8] Split pgsql_tmp cleanup into two stages.

First, pgsql_tmp directories will be renamed to stage them for
removal.  Then, all files in pgsql_tmp are removed before removing
the staged directories themselves.  This change is being made in
preparation for a follow-up change to offload most temporary file
cleanup to the new custodian process.

Note that temporary relation files cannot be cleaned up via the
aforementioned strategy and will not be offloaded to the custodian.
---
 src/backend/postmaster/postmaster.c |   8 +-
 src/backend/storage/file/fd.c       | 176 ++++++++++++++++++++++++----
 src/include/storage/fd.h            |   2 +-
 3 files changed, 162 insertions(+), 24 deletions(-)

diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index 54f77cb306..8248d55e23 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -1391,7 +1391,8 @@ PostmasterMain(int argc, char *argv[])
 	 * Remove old temporary files.  At this point there can be no other
 	 * Postgres processes running in this directory, so this should be safe.
 	 */
-	RemovePgTempFiles();
+	RemovePgTempFiles(true, true);
+	RemovePgTempFiles(false, false);
 
 	/*
 	 * Initialize stats collection subsystem (this does NOT start the
@@ -4139,7 +4140,10 @@ PostmasterStateMachine(void)
 
 		/* remove leftover temporary files after a crash */
 		if (remove_temp_files_after_crash)
-			RemovePgTempFiles();
+		{
+			RemovePgTempFiles(true, true);
+			RemovePgTempFiles(false, false);
+		}
 
 		/* allow background workers to immediately restart */
 		ResetBackgroundWorkerCrashTimes();
diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c
index 35cb6f7bb6..d3019a4b67 100644
--- a/src/backend/storage/file/fd.c
+++ b/src/backend/storage/file/fd.c
@@ -112,6 +112,8 @@
 #define PG_FLUSH_DATA_WORKS 1
 #endif
 
+#define PG_TEMP_DIR_TO_REMOVE_PREFIX (PG_TEMP_FILES_DIR "_to_remove_")
+
 /*
  * We must leave some file descriptors free for system(), the dynamic loader,
  * and other code that tries to open files without consulting fd.c.  This
@@ -338,6 +340,8 @@ static void BeforeShmemExit_Files(int code, Datum arg);
 static void CleanupTempFiles(bool isCommit, bool isProcExit);
 static void RemovePgTempRelationFiles(const char *tsdirname);
 static void RemovePgTempRelationFilesInDbspace(const char *dbspacedirname);
+static void StagePgTempDirForRemoval(const char *tmp_dir);
+static void RemoveStagedPgTempDirs(const char *spc_dir);
 
 static void walkdir(const char *path,
 					void (*action) (const char *fname, bool isdir, int elevel),
@@ -3133,24 +3137,20 @@ CleanupTempFiles(bool isCommit, bool isProcExit)
  * Remove temporary and temporary relation files left over from a prior
  * postmaster session
  *
- * This should be called during postmaster startup.  It will forcibly
- * remove any leftover files created by OpenTemporaryFile and any leftover
- * temporary relation files created by mdcreate.
+ * If stage is true, this function will simply rename all pgsql_tmp directories
+ * to stage them for removal at a later time.  If stage is false, this function
+ * will delete all files in the staged directories as well as the directories
+ * themselves.
  *
- * During post-backend-crash restart cycle, this routine is called when
- * remove_temp_files_after_crash GUC is enabled. Multiple crashes while
- * queries are using temp files could result in useless storage usage that can
- * only be reclaimed by a service restart. The argument against enabling it is
- * that someone might want to examine the temporary files for debugging
- * purposes. This does however mean that OpenTemporaryFile had better allow for
- * collision with an existing temp file name.
+ * If remove_relation_files is true, this function will remove the temporary
+ * relation files.  Otherwise, this step is skipped.
  *
  * NOTE: this function and its subroutines generally report syscall failures
  * with ereport(LOG) and keep going.  Removing temp files is not so critical
  * that we should fail to start the database when we can't do it.
  */
 void
-RemovePgTempFiles(void)
+RemovePgTempFiles(bool stage, bool remove_relation_files)
 {
 	char		temp_path[MAXPGPATH + 10 + sizeof(TABLESPACE_VERSION_DIRECTORY) + sizeof(PG_TEMP_FILES_DIR)];
 	DIR		   *spc_dir;
@@ -3159,9 +3159,16 @@ RemovePgTempFiles(void)
 	/*
 	 * First process temp files in pg_default ($PGDATA/base)
 	 */
-	snprintf(temp_path, sizeof(temp_path), "base/%s", PG_TEMP_FILES_DIR);
-	RemovePgTempDir(temp_path, true, false);
-	RemovePgTempRelationFiles("base");
+	if (stage)
+	{
+		snprintf(temp_path, sizeof(temp_path), "base/%s", PG_TEMP_FILES_DIR);
+		StagePgTempDirForRemoval(temp_path);
+	}
+	else
+		RemoveStagedPgTempDirs("base");
+
+	if (remove_relation_files)
+		RemovePgTempRelationFiles("base");
 
 	/*
 	 * Cycle through temp directories for all non-default tablespaces.
@@ -3174,13 +3181,26 @@ RemovePgTempFiles(void)
 			strcmp(spc_de->d_name, "..") == 0)
 			continue;
 
-		snprintf(temp_path, sizeof(temp_path), "pg_tblspc/%s/%s/%s",
-				 spc_de->d_name, TABLESPACE_VERSION_DIRECTORY, PG_TEMP_FILES_DIR);
-		RemovePgTempDir(temp_path, true, false);
+		if (stage)
+		{
+			snprintf(temp_path, sizeof(temp_path), "pg_tblspc/%s/%s/%s",
+					 spc_de->d_name, TABLESPACE_VERSION_DIRECTORY,
+					 PG_TEMP_FILES_DIR);
+			StagePgTempDirForRemoval(temp_path);
+		}
+		else
+		{
+			snprintf(temp_path, sizeof(temp_path), "pg_tblspc/%s/%s",
+					 spc_de->d_name, TABLESPACE_VERSION_DIRECTORY);
+			RemoveStagedPgTempDirs(temp_path);
+		}
 
-		snprintf(temp_path, sizeof(temp_path), "pg_tblspc/%s/%s",
-				 spc_de->d_name, TABLESPACE_VERSION_DIRECTORY);
-		RemovePgTempRelationFiles(temp_path);
+		if (remove_relation_files)
+		{
+			snprintf(temp_path, sizeof(temp_path), "pg_tblspc/%s/%s",
+					 spc_de->d_name, TABLESPACE_VERSION_DIRECTORY);
+			RemovePgTempRelationFiles(temp_path);
+		}
 	}
 
 	FreeDir(spc_dir);
@@ -3194,7 +3214,121 @@ RemovePgTempFiles(void)
 }
 
 /*
- * Process one pgsql_tmp directory for RemovePgTempFiles.
+ * StagePgTempDirForRemoval
+ *
+ * This function renames the given directory with a special prefix that
+ * RemoveStagedPgTempDirs() will know to look for.  An integer is appended to
+ * the end of the new directory name in case previously staged pgsql_tmp
+ * directories have not yet been removed.
+ */
+static void
+StagePgTempDirForRemoval(const char *tmp_dir)
+{
+	DIR		   *dir;
+	char		stage_path[MAXPGPATH * 2];
+	char		parent_path[MAXPGPATH * 2];
+
+	/*
+	 * If tmp_dir doesn't exist, there is nothing to stage.
+	 */
+	dir = AllocateDir(tmp_dir);
+	if (dir == NULL)
+	{
+		if (errno != ENOENT)
+			ereport(LOG,
+					(errcode_for_file_access(),
+					 errmsg("could not open directory \"%s\": %m", tmp_dir)));
+		return;
+	}
+	FreeDir(dir);
+
+	strlcpy(parent_path, tmp_dir, MAXPGPATH * 2);
+	get_parent_directory(parent_path);
+
+	/*
+	 * get_parent_directory() returns an empty string if the input argument is
+	 * just a file name (see comments in path.c), so handle that as being the
+	 * current directory.
+	 */
+	if (strlen(parent_path) == 0)
+		strlcpy(parent_path, ".", MAXPGPATH * 2);
+
+	/*
+	 * Find a name for the stage directory.  We just increment an integer at the
+	 * end of the name until we find one that doesn't exist.
+	 */
+	for (int n = 0; n <= INT_MAX; n++)
+	{
+		snprintf(stage_path, sizeof(stage_path), "%s/%s%d", parent_path,
+				 PG_TEMP_DIR_TO_REMOVE_PREFIX, n);
+
+		dir = AllocateDir(stage_path);
+		if (dir == NULL)
+		{
+			if (errno == ENOENT)
+				break;
+
+			ereport(LOG,
+					(errcode_for_file_access(),
+					 errmsg("could not open directory \"%s\": %m",
+							stage_path)));
+			return;
+		}
+		FreeDir(dir);
+
+		stage_path[0] = '\0';
+	}
+
+	/*
+	 * In the unlikely event that we couldn't find a name for the stage
+	 * directory, bail out.
+	 */
+	if (stage_path[0] == '\0')
+	{
+		ereport(LOG,
+				(errmsg("could not stage \"%s\" for deletion",
+						tmp_dir)));
+		return;
+	}
+
+	/*
+	 * Rename the temporary directory.
+	 */
+	if (rename(tmp_dir, stage_path) != 0)
+		ereport(LOG,
+				(errcode_for_file_access(),
+				 errmsg("could not rename directory \"%s\" to \"%s\": %m",
+						tmp_dir, stage_path)));
+}
+
+/*
+ * RemoveStagedPgTempDirs
+ *
+ * This function removes all pgsql_tmp directories that have been staged for
+ * removal by StagePgTempDirForRemoval() in the given tablespace directory.
+ */
+static void
+RemoveStagedPgTempDirs(const char *spc_dir)
+{
+	char		temp_path[MAXPGPATH * 2];
+	DIR		   *dir;
+	struct dirent *de;
+
+	dir = AllocateDir(spc_dir);
+	while ((de = ReadDirExtended(dir, spc_dir, LOG)) != NULL)
+	{
+		if (strncmp(de->d_name, PG_TEMP_DIR_TO_REMOVE_PREFIX,
+					strlen(PG_TEMP_DIR_TO_REMOVE_PREFIX)) != 0)
+			continue;
+
+		snprintf(temp_path, sizeof(temp_path), "%s/%s", spc_dir, de->d_name);
+		RemovePgTempDir(temp_path, true, false);
+	}
+	FreeDir(dir);
+}
+
+/*
+ * Process one pgsql_tmp directory for RemoveStagedPgTempDirs.
  *
  * If missing_ok is true, it's all right for the named directory to not exist.
  * Any other problem results in a LOG message.  (missing_ok should be true at
diff --git a/src/include/storage/fd.h b/src/include/storage/fd.h
index 525847daea..240992ca51 100644
--- a/src/include/storage/fd.h
+++ b/src/include/storage/fd.h
@@ -168,7 +168,7 @@ extern Oid	GetNextTempTableSpace(void);
 extern void AtEOXact_Files(bool isCommit);
 extern void AtEOSubXact_Files(bool isCommit, SubTransactionId mySubid,
 							  SubTransactionId parentSubid);
-extern void RemovePgTempFiles(void);
+extern void RemovePgTempFiles(bool stage, bool remove_relation_files);
 extern void RemovePgTempDir(const char *tmpdirname, bool missing_ok,
 							bool unlink_all);
 extern bool looks_like_temp_rel_name(const char *name);
-- 
2.25.1

>From f0c75b9bdd490ca3290f7fba7cc9fff2423cde30 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <bossa...@amazon.com>
Date: Sun, 5 Dec 2021 21:42:52 -0800
Subject: [PATCH v5 4/8] Move pgsql_tmp file removal to custodian process.

With this change, startup (and restart after a crash) simply
renames the pgsql_tmp directories, and the custodian process
actually removes all the files in the staged directories as well as
the staged directories themselves.  This should help avoid long
startup delays due to many leftover temporary files.
---
 src/backend/postmaster/custodian.c  | 13 +++++++++++-
 src/backend/postmaster/postmaster.c | 14 ++++++++-----
 src/backend/storage/file/fd.c       | 32 +++++++++++++++++++++--------
 3 files changed, 44 insertions(+), 15 deletions(-)

diff --git a/src/backend/postmaster/custodian.c b/src/backend/postmaster/custodian.c
index 5f2b647544..5bad0af474 100644
--- a/src/backend/postmaster/custodian.c
+++ b/src/backend/postmaster/custodian.c
@@ -195,7 +195,18 @@ CustodianMain(void)
 
 		start_time = (pg_time_t) time(NULL);
 
-		/* TODO: offloaded tasks go here */
+		/*
+		 * Remove any pgsql_tmp directories that have been staged for deletion.
+		 * Since pgsql_tmp directories can accumulate many files, removing all
+		 * of the files during startup (which we used to do) can take a very
+		 * long time.  To avoid delaying startup, we simply have startup rename
+		 * the temporary directories, and we clean them up here.
+		 *
+		 * pgsql_tmp directories are not staged or cleaned in single-user mode,
+		 * so we don't need any extra handling outside of the custodian process
+		 * for this.
+		 */
+		RemovePgTempFiles(false, false);
 
 		/* Calculate how long to sleep */
 		end_time = (pg_time_t) time(NULL);
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index 8248d55e23..56b87d79a3 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -1390,9 +1390,11 @@ PostmasterMain(int argc, char *argv[])
 	/*
 	 * Remove old temporary files.  At this point there can be no other
 	 * Postgres processes running in this directory, so this should be safe.
+	 *
+	 * Note that this just stages the pgsql_tmp directories for deletion.  The
+	 * custodian process is responsible for actually removing the files.
 	 */
 	RemovePgTempFiles(true, true);
-	RemovePgTempFiles(false, false);
 
 	/*
 	 * Initialize stats collection subsystem (this does NOT start the
@@ -4138,12 +4140,14 @@ PostmasterStateMachine(void)
 		ereport(LOG,
 				(errmsg("all server processes terminated; reinitializing")));
 
-		/* remove leftover temporary files after a crash */
+		/*
+		 * Remove leftover temporary files after a crash.
+		 *
+		 * Note that this just stages the pgsql_tmp directories for deletion.
+		 * The custodian process is responsible for actually removing the files.
+		 */
 		if (remove_temp_files_after_crash)
-		{
 			RemovePgTempFiles(true, true);
-			RemovePgTempFiles(false, false);
-		}
 
 		/* allow background workers to immediately restart */
 		ResetBackgroundWorkerCrashTimes();
diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c
index d3019a4b67..5d39a31d14 100644
--- a/src/backend/storage/file/fd.c
+++ b/src/backend/storage/file/fd.c
@@ -97,9 +97,12 @@
 #include "pgstat.h"
 #include "port/pg_iovec.h"
 #include "portability/mem.h"
+#include "postmaster/interrupt.h"
 #include "postmaster/startup.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
+#include "storage/latch.h"
+#include "storage/proc.h"
 #include "utils/guc.h"
 #include "utils/resowner_private.h"
 
@@ -1640,9 +1643,9 @@ PathNameOpenFilePerm(const char *fileName, int fileFlags, mode_t fileMode)
  *
  * Directories created within the top-level temporary directory should begin
  * with PG_TEMP_FILE_PREFIX, so that they can be identified as temporary and
- * deleted at startup by RemovePgTempFiles().  Further subdirectories below
- * that do not need any particular prefix.
-*/
+ * deleted by RemovePgTempFiles().  Further subdirectories below that do not
+ * need any particular prefix.
+ */
 void
 PathNameCreateTemporaryDir(const char *basedir, const char *directory)
 {
@@ -1840,9 +1843,9 @@ OpenTemporaryFileInTablespace(Oid tblspcOid, bool rejectError)
  *
  * If the file is inside the top-level temporary directory, its name should
  * begin with PG_TEMP_FILE_PREFIX so that it can be identified as temporary
- * and deleted at startup by RemovePgTempFiles().  Alternatively, it can be
- * inside a directory created with PathNameCreateTemporaryDir(), in which case
- * the prefix isn't needed.
+ * and deleted by RemovePgTempFiles().  Alternatively, it can be inside a
+ * directory created with PathNameCreateTemporaryDir(), in which case the prefix
+ * isn't needed.
  */
 File
 PathNameCreateTemporaryFile(const char *path, bool error_on_failure)
@@ -3175,7 +3178,8 @@ RemovePgTempFiles(bool stage, bool remove_relation_files)
 	 */
 	spc_dir = AllocateDir("pg_tblspc");
 
-	while ((spc_de = ReadDirExtended(spc_dir, "pg_tblspc", LOG)) != NULL)
+	while (!ShutdownRequestPending &&
+		   (spc_de = ReadDirExtended(spc_dir, "pg_tblspc", LOG)) != NULL)
 	{
 		if (strcmp(spc_de->d_name, ".") == 0 ||
 			strcmp(spc_de->d_name, "..") == 0)
@@ -3211,6 +3215,14 @@ RemovePgTempFiles(bool stage, bool remove_relation_files)
 	 * would create a race condition.  It's done separately, earlier in
 	 * postmaster startup.
 	 */
+
+	/*
+	 * If we just staged some pgsql_tmp directories for removal, wake up the
+	 * custodian process so that it deletes all the files in the staged
+	 * directories as well as the directories themselves.
+	 */
+	if (stage && ProcGlobal->custodianLatch)
+		SetLatch(ProcGlobal->custodianLatch);
 }
 
 /*
@@ -3315,7 +3327,8 @@ RemoveStagedPgTempDirs(const char *spc_dir)
 	struct dirent *de;
 
 	dir = AllocateDir(spc_dir);
-	while ((de = ReadDirExtended(dir, spc_dir, LOG)) != NULL)
+	while (!ShutdownRequestPending &&
+		   (de = ReadDirExtended(dir, spc_dir, LOG)) != NULL)
 	{
 		if (strncmp(de->d_name, PG_TEMP_DIR_TO_REMOVE_PREFIX,
 					strlen(PG_TEMP_DIR_TO_REMOVE_PREFIX)) != 0)
@@ -3354,7 +3367,8 @@ RemovePgTempDir(const char *tmpdirname, bool missing_ok, bool unlink_all)
 	if (temp_dir == NULL && errno == ENOENT && missing_ok)
 		return;
 
-	while ((temp_de = ReadDirExtended(temp_dir, tmpdirname, LOG)) != NULL)
+	while (!ShutdownRequestPending &&
+		   (temp_de = ReadDirExtended(temp_dir, tmpdirname, LOG)) != NULL)
 	{
 		if (strcmp(temp_de->d_name, ".") == 0 ||
 			strcmp(temp_de->d_name, "..") == 0)
-- 
2.25.1

>From 9c2013d53cc5c857ef8aca3df044613e66215aee Mon Sep 17 00:00:00 2001
From: Nathan Bossart <bossa...@amazon.com>
Date: Sun, 5 Dec 2021 22:02:40 -0800
Subject: [PATCH v5 5/8] Move removal of old serialized snapshots to custodian.

This was only done during checkpoints because it was a convenient
place to put it.  However, if there are many snapshots to remove,
it can significantly extend checkpoint time.  To avoid this, move
this work to the newly-introduced custodian process.
---
 src/backend/access/transam/xlog.c           |  2 --
 src/backend/postmaster/custodian.c          | 11 +++++++++++
 src/backend/replication/logical/snapbuild.c | 13 +++++++------
 src/include/replication/snapbuild.h         |  2 +-
 4 files changed, 19 insertions(+), 9 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index ce78ac413e..c4a80ea82a 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -79,7 +79,6 @@
 #include "replication/logical.h"
 #include "replication/origin.h"
 #include "replication/slot.h"
-#include "replication/snapbuild.h"
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
 #include "storage/bufmgr.h"
@@ -6807,7 +6806,6 @@ CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
 {
 	CheckPointRelationMap();
 	CheckPointReplicationSlots();
-	CheckPointSnapBuild();
 	CheckPointLogicalRewriteHeap();
 	CheckPointReplicationOrigin();
 
diff --git a/src/backend/postmaster/custodian.c b/src/backend/postmaster/custodian.c
index 5bad0af474..8591c5db9b 100644
--- a/src/backend/postmaster/custodian.c
+++ b/src/backend/postmaster/custodian.c
@@ -40,6 +40,7 @@
 #include "pgstat.h"
 #include "postmaster/custodian.h"
 #include "postmaster/interrupt.h"
+#include "replication/snapbuild.h"
 #include "storage/bufmgr.h"
 #include "storage/condition_variable.h"
 #include "storage/fd.h"
@@ -208,6 +209,16 @@ CustodianMain(void)
 		 */
 		RemovePgTempFiles(false, false);
 
+		/*
+		 * Remove serialized snapshots that are no longer required by any
+		 * logical replication slot.
+		 *
+		 * It is not important for these to be removed in single-user mode, so
+		 * we don't need any extra handling outside of the custodian process for
+		 * this.
+		 */
+		RemoveOldSerializedSnapshots();
+
 		/* Calculate how long to sleep */
 		end_time = (pg_time_t) time(NULL);
 		elapsed_secs = end_time - start_time;
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 83fca8a77d..466a6478f3 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -125,6 +125,7 @@
 #include "access/xact.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "postmaster/interrupt.h"
 #include "replication/logical.h"
 #include "replication/reorderbuffer.h"
 #include "replication/snapbuild.h"
@@ -1912,14 +1913,13 @@ snapshot_not_interesting:
 
 /*
  * Remove all serialized snapshots that are not required anymore because no
- * slot can need them. This doesn't actually have to run during a checkpoint,
- * but it's a convenient point to schedule this.
+ * slot can need them.
  *
- * NB: We run this during checkpoints even if logical decoding is disabled so
- * we cleanup old slots at some point after it got disabled.
+ * NB: We run this even if logical decoding is disabled so we cleanup old slots
+ * at some point after it got disabled.
  */
 void
-CheckPointSnapBuild(void)
+RemoveOldSerializedSnapshots(void)
 {
 	XLogRecPtr	cutoff;
 	XLogRecPtr	redo;
@@ -1942,7 +1942,8 @@ CheckPointSnapBuild(void)
 		cutoff = redo;
 
 	snap_dir = AllocateDir("pg_logical/snapshots");
-	while ((snap_de = ReadDir(snap_dir, "pg_logical/snapshots")) != NULL)
+	while (!ShutdownRequestPending &&
+		   (snap_de = ReadDir(snap_dir, "pg_logical/snapshots")) != NULL)
 	{
 		uint32		hi;
 		uint32		lo;
diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h
index d179251aad..55a2beb434 100644
--- a/src/include/replication/snapbuild.h
+++ b/src/include/replication/snapbuild.h
@@ -57,7 +57,7 @@ struct ReorderBuffer;
 struct xl_heap_new_cid;
 struct xl_running_xacts;
 
-extern void CheckPointSnapBuild(void);
+extern void RemoveOldSerializedSnapshots(void);
 
 extern SnapBuild *AllocateSnapshotBuilder(struct ReorderBuffer *cache,
 										  TransactionId xmin_horizon, XLogRecPtr start_lsn,
-- 
2.25.1

>From 2a9c103b9ce034647ec878da10c7b194ccebea20 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <bossa...@amazon.com>
Date: Sun, 12 Dec 2021 22:07:11 -0800
Subject: [PATCH v5 6/8] Move removal of old logical rewrite mapping files to
 custodian.

If there are many such files to remove, checkpoints can take much
longer.  To avoid this, move this work to the newly-introduced
custodian process.
---
 src/backend/access/heap/rewriteheap.c | 83 +++++++++++++++++++++++----
 src/backend/postmaster/checkpointer.c | 33 +++++++++++
 src/backend/postmaster/custodian.c    | 10 ++++
 src/include/access/rewriteheap.h      |  1 +
 src/include/postmaster/bgwriter.h     |  3 +
 5 files changed, 120 insertions(+), 10 deletions(-)

diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c
index 2a53826736..c5a1103687 100644
--- a/src/backend/access/heap/rewriteheap.c
+++ b/src/backend/access/heap/rewriteheap.c
@@ -116,10 +116,13 @@
 #include "lib/ilist.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "postmaster/bgwriter.h"
+#include "postmaster/interrupt.h"
 #include "replication/logical.h"
 #include "replication/slot.h"
 #include "storage/bufmgr.h"
 #include "storage/fd.h"
+#include "storage/proc.h"
 #include "storage/procarray.h"
 #include "storage/smgr.h"
 #include "utils/memutils.h"
@@ -1182,7 +1185,8 @@ heap_xlog_logical_rewrite(XLogReaderState *r)
  * Perform a checkpoint for logical rewrite mappings
  *
  * This serves two tasks:
- * 1) Remove all mappings not needed anymore based on the logical restart LSN
+ * 1) Alert the custodian to remove all mappings not needed anymore based on the
+ *    logical restart LSN
  * 2) Flush all remaining mappings to disk, so that replay after a checkpoint
  *	  only has to deal with the parts of a mapping that have been written out
  *	  after the checkpoint started.
@@ -1210,6 +1214,11 @@ CheckPointLogicalRewriteHeap(void)
 	if (cutoff != InvalidXLogRecPtr && redo < cutoff)
 		cutoff = redo;
 
+	/* let the custodian know what it can remove */
+	CheckPointSetLogicalRewriteCutoff(cutoff);
+	if (ProcGlobal->custodianLatch)
+		SetLatch(ProcGlobal->custodianLatch);
+
 	mappings_dir = AllocateDir("pg_logical/mappings");
 	while ((mapping_de = ReadDir(mappings_dir, "pg_logical/mappings")) != NULL)
 	{
@@ -1240,15 +1249,7 @@ CheckPointLogicalRewriteHeap(void)
 
 		lsn = ((uint64) hi) << 32 | lo;
 
-		if (lsn < cutoff || cutoff == InvalidXLogRecPtr)
-		{
-			elog(DEBUG1, "removing logical rewrite file \"%s\"", path);
-			if (unlink(path) < 0)
-				ereport(ERROR,
-						(errcode_for_file_access(),
-						 errmsg("could not remove file \"%s\": %m", path)));
-		}
-		else
+		if (lsn >= cutoff && cutoff != InvalidXLogRecPtr)
 		{
 			/* on some operating systems fsyncing a file requires O_RDWR */
 			int			fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
@@ -1286,3 +1287,65 @@ CheckPointLogicalRewriteHeap(void)
 	/* persist directory entries to disk */
 	fsync_fname("pg_logical/mappings", true);
 }
+
+/*
+ * Remove all mappings not needed anymore based on the logical restart LSN saved
+ * by the checkpointer.  We use this saved value instead of calling
+ * ReplicationSlotsComputeLogicalRestartLSN() so that we don't interfere with an
+ * ongoing call to CheckPointLogicalRewriteHeap() that is flushing mappings to
+ * disk.
+ */
+void
+RemoveOldLogicalRewriteMappings(void)
+{
+	XLogRecPtr	cutoff;
+	DIR		   *mappings_dir;
+	struct dirent *mapping_de;
+	char		path[MAXPGPATH + 20];
+	bool		value_set = false;
+
+	cutoff = CheckPointGetLogicalRewriteCutoff(&value_set);
+	if (!value_set)
+		return;
+
+	mappings_dir = AllocateDir("pg_logical/mappings");
+	while (!ShutdownRequestPending &&
+		   (mapping_de = ReadDir(mappings_dir, "pg_logical/mappings")) != NULL)
+	{
+		struct stat statbuf;
+		Oid			dboid;
+		Oid			relid;
+		XLogRecPtr	lsn;
+		TransactionId rewrite_xid;
+		TransactionId create_xid;
+		uint32		hi,
+					lo;
+
+		if (strcmp(mapping_de->d_name, ".") == 0 ||
+			strcmp(mapping_de->d_name, "..") == 0)
+			continue;
+
+		snprintf(path, sizeof(path), "pg_logical/mappings/%s", mapping_de->d_name);
+		if (lstat(path, &statbuf) == 0 && !S_ISREG(statbuf.st_mode))
+			continue;
+
+		/* Skip over files that cannot be ours. */
+		if (strncmp(mapping_de->d_name, "map-", 4) != 0)
+			continue;
+
+		if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
+				   &dboid, &relid, &hi, &lo, &rewrite_xid, &create_xid) != 6)
+			elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
+
+		lsn = ((uint64) hi) << 32 | lo;
+		if (lsn >= cutoff && cutoff != InvalidXLogRecPtr)
+			continue;
+
+		elog(DEBUG1, "removing logical rewrite file \"%s\"", path);
+		if (unlink(path) < 0)
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not remove file \"%s\": %m", path)));
+	}
+	FreeDir(mappings_dir);
+}
diff --git a/src/backend/postmaster/checkpointer.c b/src/backend/postmaster/checkpointer.c
index 4488e3a443..666f2a0368 100644
--- a/src/backend/postmaster/checkpointer.c
+++ b/src/backend/postmaster/checkpointer.c
@@ -128,6 +128,9 @@ typedef struct
 	uint32		num_backend_writes; /* counts user backend buffer writes */
 	uint32		num_backend_fsync;	/* counts user backend fsync calls */
 
+	XLogRecPtr	logical_rewrite_mappings_cutoff;	/* can remove older mappings */
+	bool		logical_rewrite_mappings_cutoff_set;
+
 	int			num_requests;	/* current # of requests */
 	int			max_requests;	/* allocated array size */
 	CheckpointerRequest requests[FLEXIBLE_ARRAY_MEMBER];
@@ -1342,3 +1345,33 @@ FirstCallSinceLastCheckpoint(void)
 
 	return FirstCall;
 }
+
+/*
+ * Used by CheckPointLogicalRewriteHeap() to tell the custodian which logical
+ * rewrite mapping files it can remove.
+ */
+void
+CheckPointSetLogicalRewriteCutoff(XLogRecPtr cutoff)
+{
+	SpinLockAcquire(&CheckpointerShmem->ckpt_lck);
+	CheckpointerShmem->logical_rewrite_mappings_cutoff = cutoff;
+	CheckpointerShmem->logical_rewrite_mappings_cutoff_set = true;
+	SpinLockRelease(&CheckpointerShmem->ckpt_lck);
+}
+
+/*
+ * Used by the custodian to determine which logical rewrite mapping files it can
+ * remove.
+ */
+XLogRecPtr
+CheckPointGetLogicalRewriteCutoff(bool *value_set)
+{
+	XLogRecPtr	cutoff;
+
+	SpinLockAcquire(&CheckpointerShmem->ckpt_lck);
+	cutoff = CheckpointerShmem->logical_rewrite_mappings_cutoff;
+	*value_set = CheckpointerShmem->logical_rewrite_mappings_cutoff_set;
+	SpinLockRelease(&CheckpointerShmem->ckpt_lck);
+
+	return cutoff;
+}
diff --git a/src/backend/postmaster/custodian.c b/src/backend/postmaster/custodian.c
index 8591c5db9b..7f914a617f 100644
--- a/src/backend/postmaster/custodian.c
+++ b/src/backend/postmaster/custodian.c
@@ -36,6 +36,7 @@
 
 #include <time.h>
 
+#include "access/rewriteheap.h"
 #include "libpq/pqsignal.h"
 #include "pgstat.h"
 #include "postmaster/custodian.h"
@@ -219,6 +220,15 @@ CustodianMain(void)
 		 */
 		RemoveOldSerializedSnapshots();
 
+		/*
+		 * Remove logical rewrite mapping files that are no longer needed.
+		 *
+		 * It is not important for these to be removed in single-user mode, so
+		 * we don't need any extra handling outside of the custodian process for
+		 * this.
+		 */
+		RemoveOldLogicalRewriteMappings();
+
 		/* Calculate how long to sleep */
 		end_time = (pg_time_t) time(NULL);
 		elapsed_secs = end_time - start_time;
diff --git a/src/include/access/rewriteheap.h b/src/include/access/rewriteheap.h
index aa5c48f219..f493094557 100644
--- a/src/include/access/rewriteheap.h
+++ b/src/include/access/rewriteheap.h
@@ -53,5 +53,6 @@ typedef struct LogicalRewriteMappingData
  */
 #define LOGICAL_REWRITE_FORMAT "map-%x-%x-%X_%X-%x-%x"
 void		CheckPointLogicalRewriteHeap(void);
+void		RemoveOldLogicalRewriteMappings(void);
 
 #endif							/* REWRITE_HEAP_H */
diff --git a/src/include/postmaster/bgwriter.h b/src/include/postmaster/bgwriter.h
index 2882efd67b..051e6732cb 100644
--- a/src/include/postmaster/bgwriter.h
+++ b/src/include/postmaster/bgwriter.h
@@ -42,4 +42,7 @@ extern void CheckpointerShmemInit(void);
 
 extern bool FirstCallSinceLastCheckpoint(void);
 
+extern void CheckPointSetLogicalRewriteCutoff(XLogRecPtr cutoff);
+extern XLogRecPtr CheckPointGetLogicalRewriteCutoff(bool *value_set);
+
 #endif							/* _BGWRITER_H */
-- 
2.25.1

>From cfca62dd55d7be7e0025e5625f18d3ab9180029c Mon Sep 17 00:00:00 2001
From: Nathan Bossart <bossa...@amazon.com>
Date: Mon, 13 Dec 2021 20:20:12 -0800
Subject: [PATCH v5 7/8] Use syncfs() in CheckPointLogicalRewriteHeap() for
 shutdown and end-of-recovery checkpoints.

This may save quite a bit of time when there are many mapping files
to flush to disk.
---
 src/backend/access/heap/rewriteheap.c | 35 ++++++++++++++++++++++++++-
 src/backend/access/transam/xlog.c     |  2 +-
 src/include/access/rewriteheap.h      |  2 +-
 3 files changed, 36 insertions(+), 3 deletions(-)

diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c
index c5a1103687..1a8621c0ef 100644
--- a/src/backend/access/heap/rewriteheap.c
+++ b/src/backend/access/heap/rewriteheap.c
@@ -1193,7 +1193,7 @@ heap_xlog_logical_rewrite(XLogReaderState *r)
  * ---
  */
 void
-CheckPointLogicalRewriteHeap(void)
+CheckPointLogicalRewriteHeap(bool shutdown)
 {
 	XLogRecPtr	cutoff;
 	XLogRecPtr	redo;
@@ -1219,6 +1219,39 @@ CheckPointLogicalRewriteHeap(void)
 	if (ProcGlobal->custodianLatch)
 		SetLatch(ProcGlobal->custodianLatch);
 
+#ifdef HAVE_SYNCFS
+
+	/*
+	 * If we are doing a shutdown or end-of-recovery checkpoint, let's use
+	 * syncfs() to flush the mappings to disk instead of flushing each one
+	 * individually.  This may save us quite a bit of time when there are many
+	 * such files to flush.
+	 */
+	if (shutdown)
+	{
+		int		fd;
+
+		fd = OpenTransientFile("pg_logical/mappings", O_RDONLY);
+		if (fd < 0)
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not open file \"pg_logical/mappings\": %m")));
+
+		if (syncfs(fd) < 0)
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not synchronize file system for file \"pg_logical/mappings\": %m")));
+
+		if (CloseTransientFile(fd) != 0)
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not close file \"pg_logical/mappings\": %m")));
+
+		return;
+	}
+
+#endif							/* HAVE_SYNCFS */
+
 	mappings_dir = AllocateDir("pg_logical/mappings");
 	while ((mapping_de = ReadDir(mappings_dir, "pg_logical/mappings")) != NULL)
 	{
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index c4a80ea82a..6a3613fd98 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -6806,7 +6806,7 @@ CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
 {
 	CheckPointRelationMap();
 	CheckPointReplicationSlots();
-	CheckPointLogicalRewriteHeap();
+	CheckPointLogicalRewriteHeap(flags & (CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_END_OF_RECOVERY));
 	CheckPointReplicationOrigin();
 
 	/* Write out all dirty data in SLRUs and the main buffer pool */
diff --git a/src/include/access/rewriteheap.h b/src/include/access/rewriteheap.h
index f493094557..79cae034e5 100644
--- a/src/include/access/rewriteheap.h
+++ b/src/include/access/rewriteheap.h
@@ -52,7 +52,7 @@ typedef struct LogicalRewriteMappingData
  * ---
  */
 #define LOGICAL_REWRITE_FORMAT "map-%x-%x-%X_%X-%x-%x"
-void		CheckPointLogicalRewriteHeap(void);
+void		CheckPointLogicalRewriteHeap(bool shutdown);
 void		RemoveOldLogicalRewriteMappings(void);
 
 #endif							/* REWRITE_HEAP_H */
-- 
2.25.1

>From b5923b1b76a1fab6c21d6aec086219160473f464 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nathandboss...@gmail.com>
Date: Fri, 11 Feb 2022 09:43:57 -0800
Subject: [PATCH v5 8/8] Move removal of spilled logical slot data to
 custodian.

If there are many such files, startup can take much longer than
necessary.  To handle this, startup creates a new slot directory,
copies the state file, and swaps the new directory with the old
one.  The custodian then asynchronously cleans up the old slot
directory.
---
 src/backend/access/transam/xlog.c             |  15 +-
 src/backend/postmaster/custodian.c            |  14 +
 .../replication/logical/reorderbuffer.c       | 292 +++++++++++++++++-
 src/backend/replication/slot.c                |   4 +
 src/include/replication/reorderbuffer.h       |   1 +
 5 files changed, 317 insertions(+), 9 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 6a3613fd98..36ba3ab147 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -5045,18 +5045,21 @@ StartupXLOG(void)
 	 */
 	RelationCacheInitFileRemove();
 
-	/*
-	 * Initialize replication slots, before there's a chance to remove
-	 * required resources.
-	 */
-	StartupReplicationSlots();
-
 	/*
 	 * Startup logical state, needs to be setup now so we have proper data
 	 * during crash recovery.
+	 *
+	 * NB: This also performs some important cleanup that must be done prior to
+	 * other replication slot steps (e.g., StartupReplicationSlots()).
 	 */
 	StartupReorderBuffer();
 
+	/*
+	 * Initialize replication slots, before there's a chance to remove
+	 * required resources.
+	 */
+	StartupReplicationSlots();
+
 	/*
 	 * Startup CLOG. This must be done after ShmemVariableCache->nextXid has
 	 * been initialized and before we accept connections or begin WAL replay.
diff --git a/src/backend/postmaster/custodian.c b/src/backend/postmaster/custodian.c
index 7f914a617f..8cf237e63f 100644
--- a/src/backend/postmaster/custodian.c
+++ b/src/backend/postmaster/custodian.c
@@ -41,6 +41,7 @@
 #include "pgstat.h"
 #include "postmaster/custodian.h"
 #include "postmaster/interrupt.h"
+#include "replication/reorderbuffer.h"
 #include "replication/snapbuild.h"
 #include "storage/bufmgr.h"
 #include "storage/condition_variable.h"
@@ -210,6 +211,19 @@ CustodianMain(void)
 		 */
 		RemovePgTempFiles(false, false);
 
+		/*
+		 * Remove any replication slot directories that have been staged for
+		 * deletion.  Since slot directories can accumulate many files, removing
+		 * all of the files during startup (which we used to do) can take a very
+		 * long time.  To avoid delaying startup, we simply have startup rename
+		 * the slot directories, and we clean them up here.
+		 *
+		 * Replication slot directories are not staged or cleaned in single-user
+		 * mode, so we don't need any extra handling outside of the custodian
+		 * process for this.
+		 */
+		RemoveStagedSlotDirectories();
+
 		/*
 		 * Remove serialized snapshots that are no longer required by any
 		 * logical replication slot.
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index c2d9be81fa..ab51e41229 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -126,15 +126,19 @@
 #include "access/xlog_internal.h"
 #include "catalog/catalog.h"
 #include "commands/sequence.h"
+#include "common/string.h"
 #include "lib/binaryheap.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "postmaster/interrupt.h"
 #include "replication/logical.h"
 #include "replication/reorderbuffer.h"
 #include "replication/slot.h"
 #include "replication/snapbuild.h"	/* just for SnapBuildSnapDecRefcount */
 #include "storage/bufmgr.h"
+#include "storage/copydir.h"
 #include "storage/fd.h"
+#include "storage/proc.h"
 #include "storage/sinval.h"
 #include "utils/builtins.h"
 #include "utils/combocid.h"
@@ -297,12 +301,15 @@ static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn
 static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 									 bool txn_prepared);
 static void ReorderBufferCleanupSerializedTXNs(const char *slotname);
+static void ReorderBufferCleanup(const char *slotname);
 static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot,
 										TransactionId xid, XLogSegNo segno);
 
 static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap);
 static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
 									  ReorderBufferTXN *txn, CommandId cid);
+static void StageSlotDirForRemoval(const char *slotname, const char *slotpath);
+static void RemoveStagedSlotDirectory(const char *path);
 
 /*
  * ---------------------------------------
@@ -4835,6 +4842,202 @@ ReorderBufferCleanupSerializedTXNs(const char *slotname)
 	FreeDir(spill_dir);
 }
 
+/*
+ * Cleanup everything in the logical slot directory except for the "state" file.
+ * This is specially written for StartupReorderBuffer(), which has special logic
+ * to handle crashes at inconvenient times.
+ *
+ * NB: If anything except for the "state" file cannot be removed after startup,
+ * this will need to be updated.
+ */
+static void
+ReorderBufferCleanup(const char *slotname)
+{
+	char		path[MAXPGPATH];
+	char		newpath[MAXPGPATH];
+	char		statepath[MAXPGPATH];
+	char		newstatepath[MAXPGPATH];
+	struct stat statbuf;
+
+	sprintf(path, "pg_replslot/%s", slotname);
+	sprintf(newpath, "pg_replslot/%s.new", slotname);
+	sprintf(statepath, "pg_replslot/%s/state", slotname);
+	sprintf(newstatepath, "pg_replslot/%s.new/state", slotname);
+
+	/* we're only handling directories here, skip if it's not ours */
+	if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
+		return;
+
+	/*
+	 * Build our new slot directory, suffixed with ".new".  The caller (likely
+	 * StartupReorderBuffer()) should have already ensured that any pre-existing
+	 * ".new" directories leftover after a crash have been cleaned up.
+	 */
+	if (MakePGDirectory(newpath) < 0)
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not create directory \"%s\": %m", newpath)));
+
+	copy_file(statepath, newstatepath);
+
+	fsync_fname(newstatepath, false);
+	fsync_fname(newpath, true);
+	fsync_fname("pg_replslot", true);
+
+	/*
+	 * Move the slot directory aside for cleanup by the custodian.  After this
+	 * step, there will be no slot directory.  StartupReorderBuffer() has
+	 * special logic to make sure we don't lose the slot if we crash at this
+	 * point.
+	 */
+	StageSlotDirForRemoval(slotname, path);
+
+	/*
+	 * Move our ".new" directory to become our new slot directory.
+	 */
+	if (rename(newpath, path) != 0)
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not rename file \"%s\": %m", newpath)));
+
+	fsync_fname(path, true);
+	fsync_fname("pg_replslot", true);
+}
+
+/*
+ * This function renames the given directory with a special suffix that the
+ * custodian will know to look for.  An integer is appended to the end of the
+ * new directory name in case previously staged slot directories have not yet
+ * been removed.
+ */
+static void
+StageSlotDirForRemoval(const char *slotname, const char *slotpath)
+{
+	char		stage_path[MAXPGPATH];
+
+	/*
+	 * Find a name for the stage directory.  We just increment an integer at the
+	 * end of the name until we find one that doesn't exist.
+	 */
+	for (int n = 0; n <= INT_MAX; n++)
+	{
+		DIR		   *dir;
+
+		sprintf(stage_path, "pg_replslot/%s.to_remove_%d", slotname, n);
+
+		dir = AllocateDir(stage_path);
+		if (dir == NULL)
+		{
+			if (errno == ENOENT)
+				break;
+
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not open directory \"%s\": %m",
+							stage_path)));
+		}
+		FreeDir(dir);
+
+		stage_path[0] = '\0';
+	}
+
+	/*
+	 * In the unlikely event that we couldn't find a name for the stage
+	 * directory, bail out.
+	 */
+	if (stage_path[0] == '\0')
+		ereport(ERROR,
+				(errmsg("could not stage \"%s\" for deletion",
+						slotpath)));
+
+	/*
+	 * Rename the slot directory.
+	 */
+	if (rename(slotpath, stage_path) != 0)
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not rename file \"%s\": %m", slotpath)));
+
+	fsync_fname(stage_path, true);
+	fsync_fname("pg_replslot", true);
+}
+
+/*
+ * Remove slot directories that have been staged for deletion by
+ * ReorderBufferCleanup().
+ */
+void
+RemoveStagedSlotDirectories(void)
+{
+	DIR		   *dir;
+	struct dirent *de;
+
+	dir = AllocateDir("pg_replslot");
+	while (!ShutdownRequestPending &&
+		   (de = ReadDir(dir, "pg_replslot")) != NULL)
+	{
+		struct stat st;
+		char		path[MAXPGPATH];
+
+		if (strstr(de->d_name, ".to_remove") == NULL)
+			continue;
+
+		sprintf(path, "pg_replslot/%s", de->d_name);
+		if (lstat(path, &st) != 0)
+			ereport(ERROR,
+					(errmsg("could not stat file \"%s\": %m", path)));
+
+		if (!S_ISDIR(st.st_mode))
+			continue;
+
+		RemoveStagedSlotDirectory(path);
+	}
+	FreeDir(dir);
+}
+
+/*
+ * Removes one slot directory that has been staged for deletion by
+ * ReorderBufferCleanup().  If a shutdown request is pending, exit as soon as
+ * possible.
+ */
+static void
+RemoveStagedSlotDirectory(const char *path)
+{
+	DIR		   *dir;
+	struct dirent *de;
+
+	dir = AllocateDir(path);
+	while (!ShutdownRequestPending &&
+		   (de = ReadDir(dir, path)) != NULL)
+	{
+		struct stat st;
+		char		filepath[MAXPGPATH];
+
+		if (strcmp(de->d_name, ".") == 0 ||
+			strcmp(de->d_name, "..") == 0)
+			continue;
+
+		sprintf(filepath, "%s/%s", path, de->d_name);
+
+		if (lstat(filepath, &st) < 0)
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not stat file \"%s\": %m", filepath)));
+		else if (S_ISDIR(st.st_mode))
+			RemoveStagedSlotDirectory(filepath);
+		else if (unlink(filepath) < 0)
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not remove file \"%s\": %m", filepath)));
+	}
+	FreeDir(dir);
+
+	if (rmdir(path) < 0)
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not remove directory \"%s\": %m", path)));
+}
+
 /*
  * Given a replication slot, transaction ID and segment number, fill in the
  * corresponding spill file into 'path', which is a caller-owned buffer of size
@@ -4863,6 +5066,83 @@ StartupReorderBuffer(void)
 	DIR		   *logical_dir;
 	struct dirent *logical_de;
 
+	/*
+	 * First, handle any ".new" directories that were leftover after a crash.
+	 * These are created and swapped with the actual replication slot
+	 * directories so that cleanup of spilled data can be done asynchronously by
+	 * the custodian.
+	 */
+	logical_dir = AllocateDir("pg_replslot");
+	while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
+	{
+		char		name[NAMEDATALEN];
+		char		path[NAMEDATALEN + 12];
+		struct stat statbuf;
+
+		if (strcmp(logical_de->d_name, ".") == 0 ||
+			strcmp(logical_de->d_name, "..") == 0)
+			continue;
+
+		/*
+		 * Make sure it's a valid ".new" directory.
+		 */
+		if (!pg_str_endswith(logical_de->d_name, ".new") ||
+			strlen(logical_de->d_name) >= NAMEDATALEN + 4)
+			continue;
+
+		strncpy(name, logical_de->d_name, sizeof(name));
+		name[strlen(logical_de->d_name) - 4] = '\0';
+		if (!ReplicationSlotValidateName(name, DEBUG2))
+			continue;
+
+		sprintf(path, "pg_replslot/%s", name);
+		if (lstat(path, &statbuf) == 0)
+		{
+			if (!S_ISDIR(statbuf.st_mode))
+				continue;
+
+			/*
+			 * If the original directory still exists, just delete the ".new"
+			 * directory.  We'll try again when we call ReorderBufferCleanup()
+			 * later on.
+			 */
+			if (!rmtree(path, true))
+				ereport(ERROR,
+						(errmsg("could not remove directory \"%s\"", path)));
+		}
+		else if (errno == ENOENT)
+		{
+			char		newpath[NAMEDATALEN + 16];
+
+			/*
+			 * If the original directory is gone, we need to rename the ".new"
+			 * directory to take its place.  We know that the ".new" directory
+			 * is ready to be the real deal if we previously made it far enough
+			 * to delete the original directory.
+			 */
+			sprintf(newpath, "pg_replslot/%s", logical_de->d_name);
+			if (rename(newpath, path) != 0)
+				ereport(ERROR,
+						(errcode_for_file_access(),
+						 errmsg("could not rename file \"%s\" to \"%s\": %m",
+								newpath, path)));
+
+			fsync_fname(path, true);
+		}
+		else
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not stat file \"%s\": %m", path)));
+
+		fsync_fname("pg_replslot", true);
+	}
+	FreeDir(logical_dir);
+
+	/*
+	 * Now we can proceed with deleting all spilled data.  (This actually just
+	 * moves the directories aside so that the custodian can clean it up
+	 * asynchronously.)
+	 */
 	logical_dir = AllocateDir("pg_replslot");
 	while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
 	{
@@ -4875,12 +5155,18 @@ StartupReorderBuffer(void)
 			continue;
 
 		/*
-		 * ok, has to be a surviving logical slot, iterate and delete
-		 * everything starting with xid-*
+		 * ok, has to be a surviving logical slot, delete everything except for
+		 * state
 		 */
-		ReorderBufferCleanupSerializedTXNs(logical_de->d_name);
+		ReorderBufferCleanup(logical_de->d_name);
 	}
 	FreeDir(logical_dir);
+
+	/*
+	 * Wake up the custodian so it cleans up our old slot data.
+	 */
+	if (ProcGlobal->custodianLatch)
+		SetLatch(ProcGlobal->custodianLatch);
 }
 
 /* ---------------------------------------
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 5da5fa825a..fd48d82718 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1455,6 +1455,10 @@ StartupReplicationSlots(void)
 			continue;
 		}
 
+		/* if it's an old slot directory that's staged for removal, ignore it */
+		if (strstr(replication_de->d_name, ".to_remove") != NULL)
+			continue;
+
 		/* looks like a slot in a normal state, restore */
 		RestoreSlotFromDisk(replication_de->d_name);
 	}
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 859424bbd9..ff56ae0b22 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -719,6 +719,7 @@ TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb);
 void		ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr);
 
 void		StartupReorderBuffer(void);
+void		RemoveStagedSlotDirectories(void);
 
 bool		ReorderBufferSequenceIsTransactional(ReorderBuffer *rb,
 												 RelFileNode rnode, bool created);
-- 
2.25.1

Reply via email to