Hello everyone in this thread!

On 2021-03-18 18:04, David Steele wrote:
Seems like there should have been a patch attached?

IMO there's a technical problem with sending, receiving (or displaying on the site) emails from the list pgsql-hackers. By subsribing to this list I received the attached patch from the email [1]. And my colleague Roman Zharkov said that the button 'Resend email' from that link helped him to receive the email with the attached patch. On the other hand follwing this link in the browser I do not see the attached patch. Do you think it is worth to write about this issue to webmaster(dot)postgresql(dot)org?..

Just in case I'm lucky this email contains the lost patch.

[1] https://www.postgresql.org/message-id/4047CC05-1AF5-454B-850B-ED37374A2AC0%40postgrespro.ru

--
Marina Polyakova
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
From c071e8ee78aac811feaf54c4374c1a998409733e Mon Sep 17 00:00:00 2001
From: Dmitry Shulga <d.shu...@postgrespro.ru>
Date: Fri, 18 Dec 2020 12:38:58 +0700
Subject: [PATCH] Reduce time required to recover database from archive.

Originally database recovering from archive was performed by
sequential receiving of files with WAL records and applying them against
the database. Delivering of files containing WAL records are performed
by running a command specified by the GUC parameter restore_command.
In case receiving of every file containing WAL records takes long time
it results in standing idle most of time waiting until files be received.
If time required to apply WAL records from an archive file is significantly
lesser than time required to deliver the file from archive it leads
to nonproductive standing idle after current WAL segment is applied and
before next WAL segment be received.  As a consequence a wall time required
to recover a database from archive log can be unacceptably long.

To reduce total time required to restore database from archive the procedure
for delivering of WAL files was redesigned in order to allow concurrent
loading of WAL files. At postmaster start a few background processes are
spawned to load WAL files from archive in parallel. A number of processes
that started to perform preloading of WAL files is determined by
the new GUC parameter wal_prefetch_workers. A number of WAL files to prefetch
from archive is limited by the new GUC parameter wal_max_prefetch_amount.

Additionally, refactoring was done to extract duplicate code, used
in the files xlogarchive.c and xlogrestore.c, into stanalone functions and
move it to the file xlogutils.c

Author: Dmitry Shulga
Reviewed-by: Anna Akenteva
Tested-by: Roman Zharkov

diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile
index 595e02de722..ffbf8090f45 100644
--- a/src/backend/access/transam/Makefile
+++ b/src/backend/access/transam/Makefile
@@ -32,7 +32,8 @@ OBJS = \
 	xlogfuncs.o \
 	xloginsert.o \
 	xlogreader.o \
-	xlogutils.o
+	xlogutils.o \
+	xlogrestore.o
 
 include $(top_srcdir)/src/backend/common.mk
 
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 13f1d8c3dc7..f0a0c68725e 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -37,6 +37,7 @@
 #include "access/xloginsert.h"
 #include "access/xlogreader.h"
 #include "access/xlogutils.h"
+#include "access/xlogrestore.h"
 #include "catalog/catversion.h"
 #include "catalog/pg_control.h"
 #include "catalog/pg_database.h"
@@ -3684,10 +3685,11 @@ XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
 					 xlogfname);
 			set_ps_display(activitymsg);
 
-			restoredFromArchive = RestoreArchivedFile(path, xlogfname,
-													  "RECOVERYXLOG",
-													  wal_segment_size,
-													  InRedo);
+			restoredFromArchive = RestoreCommandXLog(path, xlogfname,
+													 "RECOVERYXLOG",
+													 wal_segment_size,
+													 InRedo);
+
 			if (!restoredFromArchive)
 				return -1;
 			break;
diff --git a/src/backend/access/transam/xlogarchive.c b/src/backend/access/transam/xlogarchive.c
index f39dc4ddf1a..fb4023f1cec 100644
--- a/src/backend/access/transam/xlogarchive.c
+++ b/src/backend/access/transam/xlogarchive.c
@@ -22,6 +22,7 @@
 #include "access/xlog.h"
 #include "access/xlog_internal.h"
 #include "access/xlogarchive.h"
+#include "access/xlogutils.h"
 #include "common/archive.h"
 #include "miscadmin.h"
 #include "postmaster/startup.h"
@@ -55,13 +56,8 @@ RestoreArchivedFile(char *path, const char *xlogfname,
 					bool cleanupEnabled)
 {
 	char		xlogpath[MAXPGPATH];
-	char	   *xlogRestoreCmd;
 	char		lastRestartPointFname[MAXPGPATH];
 	int			rc;
-	struct stat stat_buf;
-	XLogSegNo	restartSegNo;
-	XLogRecPtr	restartRedoPtr;
-	TimeLineID	restartTli;
 
 	/*
 	 * Ignore restore_command when not in archive recovery (meaning we are in
@@ -102,22 +98,7 @@ RestoreArchivedFile(char *path, const char *xlogfname,
 	/*
 	 * Make sure there is no existing file named recovername.
 	 */
-	if (stat(xlogpath, &stat_buf) != 0)
-	{
-		if (errno != ENOENT)
-			ereport(FATAL,
-					(errcode_for_file_access(),
-					 errmsg("could not stat file \"%s\": %m",
-							xlogpath)));
-	}
-	else
-	{
-		if (unlink(xlogpath) != 0)
-			ereport(FATAL,
-					(errcode_for_file_access(),
-					 errmsg("could not remove file \"%s\": %m",
-							xlogpath)));
-	}
+	FileUnlink(xlogpath);
 
 	/*
 	 * Calculate the archive file cutoff point for use during log shipping
@@ -136,97 +117,28 @@ RestoreArchivedFile(char *path, const char *xlogfname,
 	 * flags to signify the point when we can begin deleting WAL files from
 	 * the archive.
 	 */
-	if (cleanupEnabled)
-	{
-		GetOldestRestartPoint(&restartRedoPtr, &restartTli);
-		XLByteToSeg(restartRedoPtr, restartSegNo, wal_segment_size);
-		XLogFileName(lastRestartPointFname, restartTli, restartSegNo,
-					 wal_segment_size);
-		/* we shouldn't need anything earlier than last restart point */
-		Assert(strcmp(lastRestartPointFname, xlogfname) <= 0);
-	}
-	else
-		XLogFileName(lastRestartPointFname, 0, 0L, wal_segment_size);
+	XLogFileNameLastPoint(lastRestartPointFname, cleanupEnabled);
+	Assert(strcmp(lastRestartPointFname, xlogfname) <= 0);
 
-	/* Build the restore command to execute */
-	xlogRestoreCmd = BuildRestoreCommand(recoveryRestoreCommand,
-										 xlogpath, xlogfname,
-										 lastRestartPointFname);
-	if (xlogRestoreCmd == NULL)
-		elog(ERROR, "could not build restore command \"%s\"",
-			 recoveryRestoreCommand);
-
-	ereport(DEBUG3,
-			(errmsg_internal("executing restore command \"%s\"",
-							 xlogRestoreCmd)));
-
-	/*
-	 * Check signals before restore command and reset afterwards.
-	 */
-	PreRestoreCommand();
-
-	/*
-	 * Copy xlog from archival storage to XLOGDIR
-	 */
-	rc = system(xlogRestoreCmd);
-
-	PostRestoreCommand();
-	pfree(xlogRestoreCmd);
+	rc = DoRestore(xlogpath, xlogfname, lastRestartPointFname);
 
 	if (rc == 0)
 	{
-		/*
-		 * command apparently succeeded, but let's make sure the file is
-		 * really there now and has the correct size.
-		 */
-		if (stat(xlogpath, &stat_buf) == 0)
+		bool		file_not_found;
+		bool		ret = FileValidateSize(xlogpath, expectedSize, xlogfname,
+										   &file_not_found);
+
+		if (ret)
 		{
-			if (expectedSize > 0 && stat_buf.st_size != expectedSize)
-			{
-				int			elevel;
-
-				/*
-				 * If we find a partial file in standby mode, we assume it's
-				 * because it's just being copied to the archive, and keep
-				 * trying.
-				 *
-				 * Otherwise treat a wrong-sized file as FATAL to ensure the
-				 * DBA would notice it, but is that too strong? We could try
-				 * to plow ahead with a local copy of the file ... but the
-				 * problem is that there probably isn't one, and we'd
-				 * incorrectly conclude we've reached the end of WAL and we're
-				 * done recovering ...
-				 */
-				if (StandbyMode && stat_buf.st_size < expectedSize)
-					elevel = DEBUG1;
-				else
-					elevel = FATAL;
-				ereport(elevel,
-						(errmsg("archive file \"%s\" has wrong size: %lld instead of %lld",
-								xlogfname,
-								(long long int) stat_buf.st_size,
-								(long long int) expectedSize)));
-				return false;
-			}
-			else
-			{
-				ereport(LOG,
-						(errmsg("restored log file \"%s\" from archive",
-								xlogfname)));
-				strcpy(path, xlogpath);
-				return true;
-			}
+			ereport(LOG,
+					(errmsg("restored log file \"%s\" from archive",
+							xlogfname)));
+			strcpy(path, xlogpath);
+			return true;
 		}
-		else
-		{
-			/* stat failed */
-			int			elevel = (errno == ENOENT) ? LOG : FATAL;
 
-			ereport(elevel,
-					(errcode_for_file_access(),
-					 errmsg("could not stat file \"%s\": %m", xlogpath),
-					 errdetail("restore_command returned a zero exit status, but stat() failed.")));
-		}
+		if (!file_not_found)
+			return false;
 	}
 
 	/*
diff --git a/src/backend/access/transam/xlogrestore.c b/src/backend/access/transam/xlogrestore.c
new file mode 100644
index 00000000000..4cd87e296ef
--- /dev/null
+++ b/src/backend/access/transam/xlogrestore.c
@@ -0,0 +1,951 @@
+/*-------------------------------------------------------------------------
+ *
+ * xlogrestore.c
+ *	  Infrastructure for parallel restore commands execution
+ *
+ * Copyright (c) 2020, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *	  src/backend/access/transam/xlogrestore.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "access/xlogrestore.h"
+
+#define __STDC_FORMAT_MACROS
+#include <inttypes.h>
+#include <unistd.h>
+#include <sys/stat.h>
+
+#include "access/xlog.h"
+#include "access/xlog_internal.h"
+#include "access/xlogarchive.h"
+#include "access/xlogdefs.h"
+#include "access/xlogutils.h"
+#include "common/archive.h"
+#include "common/file_perm.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "port.h"
+#include "port/atomics.h"
+#include "postmaster/bgworker.h"
+#include "postmaster/startup.h"
+#include "storage/ipc.h"
+#include "storage/spin.h"
+#include "storage/shmem.h"
+#include "storage/latch.h"
+#include "storage/lock.h"
+#include "tcop/tcopprot.h"
+#include "utils/timestamp.h"
+#include "utils/memutils.h"
+
+/*
+ * The max number of WAL files to prefetch from archive.
+ */
+int			wal_max_prefetch_amount;
+
+/*
+ * Number of background workers to run on postmaster startup for retrieving
+ * WAL files from archive. Zero value of this variable turns off prefetching of
+ * WAL files from archive.
+ */
+int			wal_prefetch_workers;
+
+/*
+ * Data for restore_command bgworker.
+ */
+typedef struct RestoreSlot
+{
+	/*
+	 * The handle corresponding to a running bgworker process.
+	 */
+	BackgroundWorkerHandle *bgwhandle;
+
+	/*
+	 * The latch used for signaling that bgworker can continue downloading of
+	 * WAL files since a number of already pre-fetched	WAL files dropped below
+	 * the limit imposed by the GUC parameter wal_max_prefetch_amount.
+	 */
+	Latch		continuePrefetching;
+
+	/*
+	 * The latch to notify an invoker that a bgworker process has been
+	 * successfully run.
+	 */
+	Latch		workerReady;
+
+	/*
+	 * This flag is set by bgworker process if it was started and run
+	 * successfully.
+	 */
+	bool		workerStarted;
+} RestoreSlot;
+
+typedef struct PrefetchedFile
+{
+	/*
+	 * The name of the archive file %f.
+	 */
+	char		xlogfname[MAXFNAMELEN];
+} PrefetchedFile;
+
+/*
+ * Type of values stored in hash table RestoreDataStruct->hashtab
+ */
+typedef struct PrefetchedFileEntry
+{
+	PrefetchedFile key;
+
+	/*
+	 * True if a file with a name equals to the key does exist on a file
+	 * system, else false.
+	 */
+	bool		file_exist;
+
+	/*
+	 * True if a file with a name equals to the key has been already processed
+	 * during recovery procedure.
+	 */
+	bool		file_was_processed;
+} PrefetchedFileEntry;
+
+typedef struct RestoreDataStruct
+{
+	/*
+	 * The lock to guard against concurrent modification of structure's
+	 * members from parallel running threads.
+	 */
+	slock_t		lock;
+
+	/*
+	 * The latch to support for producer/consumer pattern. Producers are
+	 * bgworker processes pre-fetching WAL files from archive, Consumer is a
+	 * recovery process who waiting until the next required WAL file be
+	 * downloaded from archive to continue database recovering. This latch is
+	 * used for notifying the consumer that a new file was retrieved by one of
+	 * running producers.
+	 */
+	Latch		fileAvailable;
+
+	/*
+	 * Hash table to trace what WAL files have been pre-fetched.
+	 */
+	HTAB	   *hashtab;
+
+	/*
+	 * The name of the last recovery point file %r.
+	 */
+	char		pointfname[MAXFNAMELEN];
+
+	/*
+	 * TLI and an initial segment number from which to start a database
+	 * recovery
+	 */
+	XLogSegNo	restartSegNo;
+	TimeLineID	restartTli;
+
+	/*
+	 * Number of pre-fetched WAL files.
+	 */
+	int			nprefetched;
+
+	/*
+	 * Data for background workers.
+	 */
+	RestoreSlot slots[FLEXIBLE_ARRAY_MEMBER];
+
+} RestoreDataStruct;
+
+static RestoreDataStruct *RestoreData = NULL;
+
+typedef enum WALPrefetchingState_e
+{
+	WALPrefetchingIsInactive,
+	WALPrefetchingIsActive,
+	WALPrefetchingShutdown
+} WALPrefetchingState_e;
+
+static WALPrefetchingState_e WALPrefetchingState = WALPrefetchingIsInactive;
+
+static void XLogFilePathPrefetch(char *path, const char *xlogfname);
+static bool FilePathExists(const char *xlogpath);
+static void StartWALPrefetchWorkers(const char *xlogfname);
+static void ShutdownWALPrefetchWorkers(int last_process_idx);
+static bool WaitUntilFileRetrieved(const char *xlogfname,
+								   bool *wal_file_processed);
+
+/*
+ * Calculate a size of shared memory used for storing bgworker slots.
+ */
+Size
+RestoreCommandShmemSize(void)
+{
+	Size		size;
+
+	size = sizeof(RestoreDataStruct);
+	size = MAXALIGN(size);
+	size = add_size(size, mul_size(wal_prefetch_workers, sizeof(RestoreSlot)));
+	return size;
+}
+
+#define PREFETCH_DIR XLOGDIR "/" PG_TEMP_FILES_DIR
+
+/*
+ * Create a temporary directory to store prepfetched files
+ * and initialize a shared memory used for storing bgworker slots.
+ */
+void
+RestoreCommandShmemInit(void)
+{
+	bool		found;
+
+	RestoreData = (RestoreDataStruct *)
+		ShmemInitStruct("Restore Command Workers Data",
+						RestoreCommandShmemSize(),
+						&found);
+
+	if (!found)
+	{
+		int			i;
+		HASHCTL		hash_ctl;
+
+		memset(RestoreData, 0, RestoreCommandShmemSize());
+
+		SpinLockInit(&RestoreData->lock);
+
+		InitSharedLatch(&RestoreData->fileAvailable);
+
+		/* Create the hash table */
+		memset(&hash_ctl, 0, sizeof(hash_ctl));
+
+		hash_ctl.keysize = sizeof(PrefetchedFile);
+		hash_ctl.entrysize = sizeof(PrefetchedFileEntry);
+
+		RestoreData->hashtab = ShmemInitHash("Pre-fetched WAL files",
+											 wal_max_prefetch_amount,
+											 wal_max_prefetch_amount,
+											 &hash_ctl,
+											 HASH_ELEM);
+
+		/*
+		 * Initialize memory for each worker slot.
+		 */
+		for (i = 0; i < wal_prefetch_workers; ++i)
+		{
+			RestoreSlot *slot = &RestoreData->slots[i];
+
+			memset(slot, 0, sizeof(RestoreSlot));
+			InitSharedLatch(&slot->continuePrefetching);
+			InitSharedLatch(&slot->workerReady);
+		}
+
+		/* Create or clear temporary wals. */
+		PathNameCreateTemporaryDir(XLOGDIR, PREFETCH_DIR);
+		RemovePgTempFilesInDir(PREFETCH_DIR, true, true);
+	}
+}
+
+/*
+ * Iterate along bgworkers slots and notify everyone bgworker process
+ * waiting on the continuePrefetching Latch to resume retrieving of WAL files
+ * from archive.
+ */
+static void
+ResumePrefetching()
+{
+	unsigned	i;
+
+	for (i = 0; i < wal_prefetch_workers; ++i)
+	{
+		SetLatch(&RestoreData->slots[i].continuePrefetching);
+	}
+}
+
+/*
+ * This function is counterpart of RestoreArchivedFile function with xlogs
+ * pre-fetching.
+ *
+ * On success the requested WAL file has been retrieved from archive.
+ * Invocation of this function also initiates loading of WAL files that
+ * will be required later. For this goal several brworker processes
+ * are started and perform loading of WAL files. A name of file to start
+ * loading is assigned to every background worker process together with
+ * a delta value that will be applied to a segment number of a WAL file just
+ * received in order to calculate a next file name to pre-load.
+ *
+ * A number of background workers started for WAL files loading is determined
+ * by the new GUC parameter wal_prefetch_workers. A number of WAL files to
+ * prefetch is limited by the new GUC parameter wal_max_prefetch_amount.
+ * If wal_max_prefetch_amount has value 0 no background worker processes
+ * are started and WAL files preloading is not performed. In this case regular
+ * (in one by one manner) loading of WAL files is performed.
+ *
+ * Input:
+ *		path - the path to a WAL file retrieved from archive
+ *		xlogfname - a name of WAL file to retrieve from archive
+ *		recovername - the directory name where a retrieved WAL file
+ *					  has to be placed
+ *		expectedSize - expected size of the requested WAL file
+ *		cleanupEnabled - true if start recovering from last restart point,
+ *						 false if start recovering from the very outset.
+ *	Return:
+ *		true on success, false on error
+ */
+bool
+RestoreCommandXLog(char *path, const char *xlogfname, const char *recovername,
+				   const off_t expectedSize, bool cleanupEnabled)
+{
+	char		xlogpath[MAXPGPATH];
+	bool		prefetchedFileNotFound,
+				wal_file_already_processed;
+	int			nprefetched;
+	PrefetchedFileEntry *foundEntry;
+
+	/*
+	 * Synchronous mode.
+	 */
+	if (wal_max_prefetch_amount < 1)
+		goto fallback;
+
+	/*
+	 * Ignore restore_command when not in archive recovery (meaning we are in
+	 * crash recovery).
+	 */
+	if (!ArchiveRecoveryRequested)
+		goto fallback;
+
+	/*
+	 * In standby mode, restore_command might not be supplied.
+	 */
+	if (recoveryRestoreCommand == NULL ||
+		strcmp(recoveryRestoreCommand, "") == 0)
+		goto fallback;
+
+	/*
+	 * Create the last restart point file name.
+	 */
+	XLogFileNameLastPoint(RestoreData->pointfname, cleanupEnabled);
+
+	/*
+	 * Run WAL pre-fetching processes if they haven't been started yet.
+	 */
+	StartWALPrefetchWorkers(xlogfname);
+
+	/*
+	 * We shouldn't need anything earlier than the last restart point.
+	 */
+	Assert(strcmp(RestoreData->pointfname, xlogfname) <= 0);
+
+	/*
+	 * Make prefetched path for file.
+	 */
+	XLogFilePathPrefetch(xlogpath, xlogfname);
+
+	/*
+	 * Wait until file be retrieved from archive.
+	 */
+	if (!WaitUntilFileRetrieved(xlogfname, &wal_file_already_processed))
+	{
+		/*
+		 * WaitUntilFileRetrieved() returns false in case there is no more WAL
+		 * files to retrieve.
+		 */
+		snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlogfname);
+		return false;
+	}
+
+	if (wal_file_already_processed)
+		return false;
+
+	/*
+	 * Make sure the file is really there now and has the correct size.
+	 */
+	if (!FileValidateSize(xlogpath, expectedSize, xlogfname,
+						  &prefetchedFileNotFound))
+	{
+		if (prefetchedFileNotFound)
+			snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlogfname);
+		else
+			/* Remove artifacts. */
+			FileUnlink(xlogpath);
+
+		return false;
+	}
+
+	/*
+	 * Move file to target path.
+	 */
+	snprintf(path, MAXPGPATH, XLOGDIR "/%s", recovername);
+	durable_rename(xlogpath, path, ERROR);
+
+	/*
+	 * Decrease by one a number of prefetched files and wake up any of
+	 * pre-fetching processes suspended on this latch.
+	 */
+	SpinLockAcquire(&RestoreData->lock);
+
+	Assert(RestoreData->nprefetched > 0);
+
+	nprefetched = RestoreData->nprefetched;
+	RestoreData->nprefetched = RestoreData->nprefetched - 1;
+
+	/*
+	 * Check whether a number of already prefetched files greater or equal the
+	 * limit wal_max_prefetch_amount and whether this number dropped below the
+	 * limit after its decrement.
+	 */
+	if (nprefetched >= wal_max_prefetch_amount &&
+		RestoreData->nprefetched < wal_max_prefetch_amount)
+
+		/*
+		 * The value of RestoreData->nprefetched dropped below the
+		 * wal_max_prefetch_amount limit, signal background processes to
+		 * continue prefetching of WAL files from archive.
+		 */
+		ResumePrefetching();
+
+	foundEntry =
+	(PrefetchedFileEntry *) hash_search(RestoreData->hashtab, xlogfname,
+										HASH_FIND, NULL);
+
+	foundEntry->file_was_processed = true;
+	SpinLockRelease(&RestoreData->lock);
+
+	/*
+	 * Log message like in RestoreArchivedFile.
+	 */
+	ereport(LOG,
+			(errmsg("restored log file \"%s\" from archive",
+					xlogfname)));
+	return true;
+
+fallback:
+
+	/*
+	 * On any errors - try default implementation
+	 */
+	return RestoreArchivedFile(path, xlogfname, recovername, expectedSize,
+							   cleanupEnabled);
+}
+
+/*
+ * Waiting until a file with the name specified by the parameter xlogfname
+ * be received from archive and written to file system.
+ *
+ * Input:
+ *		xlogfname - a name of file to wait for delivering from archive
+ * Return:
+ *		false in case there is no more file in archive to retrieve, else true
+ */
+static bool
+WaitUntilFileRetrieved(const char *xlogfname, bool *wal_file_processed)
+{
+	bool		found;
+
+	do
+	{
+		PrefetchedFileEntry *foundEntry;
+
+		SpinLockAcquire(&RestoreData->lock);
+
+		/*
+		 * Check whether the file name does exist in the hash table. If it
+		 * does then restore_command was executed on behalf of this file name
+		 * and a file was probably copied to a destination directory. The
+		 * actual presence of the file in the destination directory is
+		 * determined by the the data member file_exist of the structure
+		 * PrefetchedFileEntry.
+		 */
+		foundEntry =
+			(PrefetchedFileEntry *) hash_search(RestoreData->hashtab, xlogfname,
+												HASH_FIND, NULL);
+
+		if (foundEntry != NULL)
+		{
+			/*
+			 * The data member file_exist of the structure PrefetchedFileEntry
+			 * has the false value if restore_command was executed but the
+			 * file wasn't copied to a destination directory by some reason,
+			 * e.g. since no more file exist in archive.
+			 */
+			found = foundEntry->file_exist;
+			*wal_file_processed = foundEntry->file_was_processed;
+
+			SpinLockRelease(&RestoreData->lock);
+			break;
+		}
+		SpinLockRelease(&RestoreData->lock);
+
+		/*
+		 * There is no an entry in hash table corresponding to a name
+		 * specified by the parameter xlogfname. Wait on the latch
+		 * RestoreData->fileAvailable located in the shared memory until a
+		 * file be retrieved from archive. bgworker processes run for
+		 * delivering WAL files from archive will trigger this latch every
+		 * time a new WAL file be delivered.
+		 */
+		(void) WaitLatch(&RestoreData->fileAvailable,
+						 WL_LATCH_SET | WL_EXIT_ON_PM_DEATH,
+						 0, PG_WAIT_EXTENSION);
+		ResetLatch(&RestoreData->fileAvailable);
+		CHECK_FOR_INTERRUPTS();
+	}
+	while (true);
+
+	return found;
+}
+
+/*
+ * Insert a file name into the hash table and wake up a thread that is waiting
+ * until file retrieved from archive.
+ *
+ * Input:
+ *		xlogfname - name of pre-fetched file.
+ */
+static void
+SignalFileDelivered(const char *xlogfname)
+{
+	PrefetchedFileEntry newFileEntry;
+	PrefetchedFileEntry *insertedElement;
+
+#ifdef USE_ASSERT_CHECKING
+	bool		found = false;
+#endif
+
+	strcpy(newFileEntry.key.xlogfname, xlogfname);
+
+	SpinLockAcquire(&RestoreData->lock);
+
+	/*
+	 * Add the new file name to the hash of file names that have been already
+	 * delivered from archive. Out of memory error is reported by ereport, so
+	 * it is not required to check the return value of hash_search().
+	 */
+#ifdef USE_ASSERT_CHECKING
+	insertedElement = hash_search(RestoreData->hashtab, &newFileEntry,
+								  HASH_ENTER, &found);
+
+	/*
+	 * An entry with such a key mustn't exist in the hash table at the tim of
+	 * insertion. If it does something really wrong happened.
+	 */
+	Assert(!found);
+#else
+	insertedElement = hash_search(RestoreData->hashtab, &newFileEntry,
+								  HASH_ENTER, NULL);
+#endif
+
+	insertedElement->file_exist = true;
+	insertedElement->file_was_processed = false;
+
+	/*
+	 * Increase by one a number of pre-fetched files
+	 */
+	RestoreData->nprefetched = RestoreData->nprefetched + 1;
+
+	/*
+	 * Wake up the thread that executing RestoreCommandXLog() to continue
+	 * processing of WAL files.
+	 */
+	SetLatch(&RestoreData->fileAvailable);
+
+	SpinLockRelease(&RestoreData->lock);
+}
+
+/*
+ * Mark that the specified WAL file doesn't exist in archive and wake up
+ * a thread that is waiting in RestoreCommandXLog() to finish WAL file
+ * processing.
+ */
+static void
+SignalFileNotExist(const char *xlogfname)
+{
+	PrefetchedFileEntry *insertedElement;
+	PrefetchedFileEntry newFileEntry;
+#ifdef USE_ASSERT_CHECKING
+	bool		found = false;
+#endif
+
+	strcpy(newFileEntry.key.xlogfname, xlogfname);
+
+	SpinLockAcquire(&RestoreData->lock);
+
+	/*
+	 * Add the new file name to the hash of file names that have been already
+	 * delivered from archive. Out of memory error is reported by ereport, so
+	 * it is not required to check the return value of hash_search().
+	 */
+#ifdef USE_ASSERT_CHECKING
+	insertedElement = hash_search(RestoreData->hashtab, &newFileEntry,
+								  HASH_ENTER, &found);
+
+	/*
+	 * We tried to add the new file name and discovered that such file does
+	 * already exist. It seems something wrong happens.
+	 */
+	Assert(!found);
+	insertedElement->file_exist = false;
+	insertedElement->file_was_processed = false;
+
+	Assert(insertedElement->file_exist == false &&
+		   strcmp(insertedElement->key.xlogfname, xlogfname) == 0);
+#else
+	insertedElement = hash_search(RestoreData->hashtab, &newFileEntry,
+								  HASH_ENTER, NULL);
+	insertedElement->file_exist = false;
+	insertedElement->file_was_processed = false;
+#endif
+
+	/*
+	 * Wake up the thread executing RestoreCommandXLog() to finish WAL files
+	 * processing since no more files left in archive.
+	 */
+	SetLatch(&RestoreData->fileAvailable);
+
+	SpinLockRelease(&RestoreData->lock);
+}
+
+/*
+ * Check whether a limit imposed by the GUC parameter wal_max_prefetch_amount
+ * has been exceeded on prefetching of WAL files and suspend further
+ * downloading of WAL files until a notification be received to resume it.
+ * Input:
+ *			bgwid - an index of bgworker process that has to suspend prefetching of
+ *			WAL files.
+ */
+static void
+SuspendPrefetchingIfRequired(uint16 bgwid)
+{
+	while (RestoreData->nprefetched >= wal_max_prefetch_amount)
+	{
+		/*
+		 * If a number of already pre-fetched WAL files exceeds a limit
+		 * imposed by the GUC parameter 'wal_max_prefetch_amount', suspend
+		 * execution until some of already retrieved WAL files be processed
+		 * and a number of pre-fetched files dropped below this limit.
+		 */
+		(void) WaitLatch(&RestoreData->slots[bgwid].continuePrefetching,
+						 WL_LATCH_SET | WL_EXIT_ON_PM_DEATH,
+						 0, PG_WAIT_EXTENSION);
+		ResetLatch(&RestoreData->slots[bgwid].continuePrefetching);
+		CHECK_FOR_INTERRUPTS();
+	}
+}
+
+/*
+ * The main entry point for bgworker.
+ * Input:
+ *		main_arg -	id value assigned to each pre-fetching bgworker process.
+ *				This value is used both as an index in array of active bgworker
+ *				processes and for calculating the number of the first segment
+ *				from that to start WAL files pre-fetching by corresponding
+ *				bgworker process.
+ *
+ * This function returns the control flow if a file that currently
+ * being processed is not found, meaning that all files were already delivered
+ * from archive and the requested file is one that was never stored
+ * in the archive.
+ */
+void
+WALPrefetchWorkerMain(Datum main_arg)
+{
+	int			rc;
+	char		xlogpath[MAXPGPATH];
+	char		xlogfnext[MAXFNAMELEN];
+	unsigned	increment;
+	XLogSegNo	nextSegNo;
+	uint16		bgwid;
+
+	/* Establish signal handlers. */
+	pqsignal(SIGTERM, die);
+	/* We're now ready to receive signals. */
+	BackgroundWorkerUnblockSignals();
+
+	/* Get RestoreSlot */
+	bgwid = DatumGetUInt16(main_arg);
+
+	nextSegNo = RestoreData->restartSegNo + bgwid;
+	increment = wal_prefetch_workers;
+
+	OwnLatch(&RestoreData->slots[bgwid].continuePrefetching);
+
+	/*
+	 * Notify invoker that the WAL files prefetching worker has just been
+	 * successfully started.
+	 */
+	RestoreData->slots[bgwid].workerStarted = true;
+	SetLatch(&RestoreData->slots[bgwid].workerReady);
+
+	while (true)
+	{
+		SuspendPrefetchingIfRequired(bgwid);
+
+		XLogFileName(xlogfnext, RestoreData->restartTli, nextSegNo,
+					 wal_segment_size);
+
+		/* Prepare path. */
+		XLogFilePathPrefetch(xlogpath, xlogfnext);
+
+		/*
+		 * Make sure there is no such file in a directory for prefetched
+		 * files.
+		 */
+		FileUnlink(xlogpath);
+
+		/* Prepare and execute the restore command. */
+		if ((rc = DoRestore(xlogpath, xlogfnext, RestoreData->pointfname)))
+		{
+			FileUnlink(xlogpath);
+
+			if (wait_result_is_any_signal(rc, true))
+				proc_exit(1);
+
+			if (!FilePathExists(xlogpath))
+				SignalFileNotExist(xlogfnext);
+			else
+
+				/*
+				 * Although execution of external program specified by the GUC
+				 * parameter 'restore_command' can failed since there is no
+				 * such file in archive, a file with this name can exist in
+				 * prefetch directory since it left from the last server start
+				 * up. If it's true put the file name into the hash table and
+				 * wake up the thread that is waiting in RestoreCommandXLog()
+				 * to continue WAL file processing.
+				 */
+				SignalFileDelivered(xlogfnext);
+
+			ereport(INFO,
+					(errmsg("could not restore file \"%s\" from archive: %s",
+							xlogfnext, wait_result_to_str(rc))));
+
+			break;
+		}
+		CHECK_FOR_INTERRUPTS();
+
+		/*
+		 * Check that file has been really written to file system and if it
+		 * does then wake up the thread that is waiting in
+		 * RestoreCommandXLog() to continue WAL file processing.
+		 */
+		if (FilePathExists(xlogpath))
+		{
+			SignalFileDelivered(xlogfnext);
+
+			ereport(INFO, errmsg("The file %s was retrieved to \"%s\"",
+								 xlogfnext, xlogpath));
+		}
+		else
+		{
+			/*
+			 * DoRestore() finished with success that means invocation of
+			 * system() API function completed without error. On the other
+			 * hand, the requested file is not found. That means something
+			 * wrong happened with script run by the API function system(),
+			 * e.g. the script doens't do really something useful, or may be
+			 * it put a file to a wrong destination. Anyway, it is time to
+			 * exit and give a chance to system administrator to fix the
+			 * issue.
+			 */
+			SignalFileNotExist(xlogfnext);
+
+			ereport(INFO, errmsg("The file %s is not found", xlogfnext));
+			break;
+		}
+
+		nextSegNo = nextSegNo + increment;
+	}
+	proc_exit(0);
+}
+
+/*
+ * Setup and spawn bgworker to prefetch WAL files from archive.
+ *
+ * Input:
+ *		bgwid - sequence number of bgworker process we are going to spawn
+ *
+ * Returns true on success and false on failure.
+ */
+static bool
+SpawnWALPrefetchWorker(uint16 bgwid)
+{
+	BackgroundWorker bgw;
+	RestoreSlot *slot = &RestoreData->slots[bgwid];
+
+	memset(&bgw, 0, sizeof(bgw));
+	snprintf(bgw.bgw_name, sizeof(bgw.bgw_name), "WAL prefetching worker #%d",
+			 bgwid);
+
+	/*
+	 * Length of the string literal "Restore Command Worker" is less than size
+	 * of a buffer referenced by the data member bgw.bgw_type (the size is
+	 * limited by the constant BGW_MAXLEN that currently has value 96).
+	 * Therefore we can use function strcpy() instead of strncpy/strlcpy to
+	 * copy the string literal into the buffer bgw.bgw_type. The same is true
+	 * for other two string literals "postgres" and "RestoreCommandWorkerMain"
+	 * and their corresponding destination buffers referenced by the data
+	 * members bgw.bgw_library_name, bgw.bgw_function_name. To guards against
+	 * further possible change of limit represented by the constant BGW_MAXLEN
+	 * the asserts have been inserted before invoking of the function strcpy()
+	 * as a sanity check. In case some of these asserts be fired it means that
+	 * some really drastic change was done in the core source code that should
+	 * be carefully studied.
+	 */
+	Assert(sizeof(bgw.bgw_type) >= sizeof("WAL files pre-fetching Worker"));
+	Assert(sizeof(bgw.bgw_library_name) >= sizeof("postgres"));
+	Assert(sizeof(bgw.bgw_function_name) >= sizeof("WALPrefetchWorkerMain"));
+
+	strcpy(bgw.bgw_type, "WAL files pre-fetching Worker");
+	strcpy(bgw.bgw_library_name, "postgres");
+	strcpy(bgw.bgw_function_name, "WALPrefetchWorkerMain");
+
+	bgw.bgw_flags = BGWORKER_SHMEM_ACCESS;
+
+	/*
+	 * BgWorkerStart_PostmasterStart for PM_RECOVERY, PM_STARTUP
+	 * BgWorkerStart_ConsistentState for PM_HOT_STANDBY
+	 */
+	bgw.bgw_start_time = HotStandbyActive() ? BgWorkerStart_ConsistentState :
+		BgWorkerStart_PostmasterStart;
+
+	bgw.bgw_restart_time = BGW_NEVER_RESTART;
+
+	/*
+	 * The value of bgw.bgw_main_arg is passed as an argument to the function
+	 * WALPrefetchWorkerMain()
+	 */
+	bgw.bgw_main_arg = UInt16GetDatum(bgwid);
+	bgw.bgw_notify_pid = MyProcPid;
+
+	return RegisterDynamicBackgroundWorker(&bgw, &slot->bgwhandle);
+}
+
+/*
+ * Terminate bgworker process whose slot addressed by specified index and
+ * free memory allocated for the slot.
+ *
+ * Input:
+ *		slot_idx -	index of a slot in the array RestoreData->slot[] that
+ *					contains data about bgworker process.
+ */
+static void
+ShutdownWALPrefetchWorker(uint16 slot_idx)
+{
+	RestoreSlot *slot = &RestoreData->slots[slot_idx];
+
+	if (slot->bgwhandle != NULL)
+	{
+		TerminateBackgroundWorker(slot->bgwhandle);
+		pfree(slot->bgwhandle);
+		slot->bgwhandle = NULL;
+	}
+}
+
+/*
+ * Stop every WAL prefetching process started from the last spawned one
+ * specified by the parameter failed_process_idx. This function is called
+ * either on postmaster shutdown or on postmaster starting up in case some of
+ * WAL prefetching workers failed to start.
+ *
+ * Input:
+ *		failed_process_idx - sequence number (starting from 0) of a bgworker
+ *							 process that failed to start.
+ */
+static void
+ShutdownWALPrefetchWorkers(int last_process_idx)
+{
+	while (last_process_idx > 0)
+		ShutdownWALPrefetchWorker(--last_process_idx);
+
+	WALPrefetchingState = WALPrefetchingShutdown;
+}
+
+/*
+ * Start bgworker processes for retrieving WAL files from archive in
+ * pre-fetching mode. Wait until all spawned processes be run. A number of
+ * bgworker processes to spawn is determined by the GUC parameter
+ * wal_prefetch_workers.
+ *
+ * Input:
+ *		xlogfname - the name of a WAL file from which to start recovery
+ *
+ * Throw error if any of WAL files pre-fetching workers fail to start.
+ */
+static void
+StartWALPrefetchWorkers(const char *xlogfname)
+{
+	int			i;
+
+	if (WALPrefetchingState != WALPrefetchingIsInactive)
+		return;
+
+	XLogFromFileName(xlogfname, &RestoreData->restartTli,
+					 &RestoreData->restartSegNo, wal_segment_size);
+
+	for (i = 0; i < wal_prefetch_workers; ++i)
+		OwnLatch(&RestoreData->slots[i].workerReady);
+
+	OwnLatch(&RestoreData->fileAvailable);
+
+	for (i = 0; i < wal_prefetch_workers; ++i)
+	{
+		if (!SpawnWALPrefetchWorker(i))
+		{
+			ShutdownWALPrefetchWorkers(i);
+			ereport(FATAL,
+					(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
+					 errmsg("could not run background process for WAL files "
+							"pre-fetching")));
+		}
+
+	}
+
+	/*
+	 * Wait until all spawned workers will be successfully started.
+	 */
+	for (i = 0; i < wal_prefetch_workers; ++i)
+	{
+		while (!RestoreData->slots[i].workerStarted)
+		{
+			(void) WaitLatch(&RestoreData->slots[i].workerReady,
+							 WL_LATCH_SET | WL_EXIT_ON_PM_DEATH,
+							 0, PG_WAIT_EXTENSION);
+			ResetLatch(&RestoreData->slots[i].workerReady);
+		}
+	}
+
+	WALPrefetchingState = WALPrefetchingIsActive;
+}
+
+/*
+ * Get a path to the WAL file pre-fetched from archive.
+ */
+static void
+XLogFilePathPrefetch(char *path, const char *xlogfname)
+{
+	snprintf(path, MAXPGPATH, PREFETCH_DIR "/%s", xlogfname);
+}
+
+/*
+ * Check that the path does exist.
+ * Return:
+ *		true if file does exist, else false.
+ * Throw error on failure.
+ */
+static bool
+FilePathExists(const char *xlogpath)
+{
+	struct stat statbuf;
+
+	if (stat(xlogpath, &statbuf) == 0)
+		return true;
+
+	if (errno != ENOENT)
+		ereport(FATAL,
+				(errcode_for_file_access(),
+				 errmsg("could not stat file \"%s\": %m",
+						xlogpath)));
+
+	return false;
+}
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 32a3099c1f4..1b826d0719a 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -18,13 +18,16 @@
 #include "postgres.h"
 
 #include <unistd.h>
+#include <sys/stat.h>
 
 #include "access/timeline.h"
 #include "access/xlog.h"
 #include "access/xlog_internal.h"
 #include "access/xlogutils.h"
+#include "common/archive.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "postmaster/startup.h"
 #include "storage/smgr.h"
 #include "utils/guc.h"
 #include "utils/hsearch.h"
@@ -979,3 +982,162 @@ WALReadRaiseError(WALReadError *errinfo)
 						(Size) errinfo->wre_req)));
 	}
 }
+
+/*
+ * Remove a file if it does exist.
+ */
+void
+FileUnlink(const char *file_path)
+{
+	struct stat statbuf;
+
+	if (stat(file_path, &statbuf))
+	{
+		if (errno != ENOENT)
+			ereport(FATAL,
+					(errcode_for_file_access(),
+					 errmsg("could not stat file \"%s\": %m",
+							file_path)));
+	}
+	else
+	{
+		if (unlink(file_path) != 0)
+			ereport(FATAL,
+					(errcode_for_file_access(),
+					 errmsg("could not remove file \"%s\": %m",
+							file_path)));
+	}
+}
+
+/*
+ * Get the last valid restart point file name.
+ *
+ * If cleanup is not enabled, initialise the last restart point file name
+ * with InvalidXLogRecPtr, which will prevent the deletion of any WAL files
+ * from the archive because of the alphabetic sorting property of WAL
+ * filenames.
+ */
+void
+XLogFileNameLastPoint(char *lastRestartPointFname, bool cleanupEnabled)
+{
+	XLogSegNo	restartSegNo;
+	XLogRecPtr	restartRedoPtr;
+	TimeLineID	restartTli;
+
+	if (cleanupEnabled)
+	{
+		GetOldestRestartPoint(&restartRedoPtr, &restartTli);
+		XLByteToSeg(restartRedoPtr, restartSegNo, wal_segment_size);
+		XLogFileName(lastRestartPointFname, restartTli, restartSegNo,
+					 wal_segment_size);
+	}
+	else
+		XLogFileName(lastRestartPointFname, 0, 0L, wal_segment_size);
+}
+
+/*
+ * Check a file is really there now and has correct size.
+ *
+ * Return true if the file does exist and has correct size,
+ * else return false.
+ *
+ * If the output variable file_not_found is not null it's assigned
+ * either true or false value depending on whether the file does exist
+ * or not.
+ */
+bool
+FileValidateSize(const char *xlogpath, off_t expectedSize,
+				 const char *xlogfname, bool *file_not_found)
+{
+	struct stat stat_buf;
+
+	if (stat(xlogpath, &stat_buf) == 0)
+	{
+		if (file_not_found)
+			*file_not_found = false;
+
+		if (expectedSize > 0 && stat_buf.st_size != expectedSize)
+		{
+			int			elevel;
+
+			/*
+			 * If we find a partial file in standby mode, we assume it's
+			 * because it's just being copied to the archive, and keep trying.
+			 *
+			 * Otherwise treat a wrong-sized file as FATAL to ensure the DBA
+			 * would notice it, but is that too strong? We could try to plow
+			 * ahead with a local copy of the file ... but the problem is that
+			 * there probably isn't one, and we'd incorrectly conclude we've
+			 * reached the end of WAL and we're done recovering ...
+			 */
+			if (StandbyMode && stat_buf.st_size < expectedSize)
+				elevel = DEBUG1;
+			else
+				elevel = FATAL;
+			ereport(elevel,
+					(errmsg("archive file \"%s\" has wrong size: %lld instead of %lld",
+							xlogfname,
+							(long long int) stat_buf.st_size,
+							(long long int) expectedSize)));
+			return false;
+		}
+		else
+			return true;
+	}
+	else
+	{
+		/* stat failed */
+		if (errno != ENOENT)
+			ereport(FATAL,
+					(errcode_for_file_access(),
+					 errmsg("could not stat file \"%s\": %m",
+							xlogpath)));
+		if (file_not_found)
+			*file_not_found = true;
+
+		return false;
+	}
+
+}
+
+/*
+ * Build and execute restore_command.
+ *
+ * Return the result of command execution (the exit status of the shell),
+ * or -1 if a system error occurred. A return value of 127 means
+ * the execution of the shell failed.
+ */
+int
+DoRestore(const char *xlogpath, const char *xlogfname, const char *pointfname)
+{
+	char	   *xlogRestoreCmd;
+	int			rc;
+
+	/* Build a restore command to execute */
+	xlogRestoreCmd = BuildRestoreCommand(recoveryRestoreCommand, xlogpath,
+										 xlogfname, pointfname);
+
+	if (xlogRestoreCmd == NULL)
+		elog(PANIC, "could not build restore command \"%s\"",
+			 recoveryRestoreCommand);
+
+	ereport(DEBUG3,
+			(errmsg_internal("executing restore command \"%s\"",
+							 xlogRestoreCmd)));
+
+	/*
+	 * Check signals before restore command and reset afterwards.
+	 */
+	PreRestoreCommand();
+
+	/*
+	 * Execute
+	 */
+	rc = system(xlogRestoreCmd);
+
+	PostRestoreCommand();
+
+	pfree(xlogRestoreCmd);
+
+	return rc;
+}
diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c
index 5a9a0e34353..7e2b2db6a27 100644
--- a/src/backend/postmaster/bgworker.c
+++ b/src/backend/postmaster/bgworker.c
@@ -13,6 +13,7 @@
 #include "postgres.h"
 
 #include "access/parallel.h"
+#include "access/xlogrestore.h"
 #include "libpq/pqsignal.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -128,6 +129,9 @@ static const struct
 	},
 	{
 		"ApplyWorkerMain", ApplyWorkerMain
+	},
+	{
+		"WALPrefetchWorkerMain", WALPrefetchWorkerMain
 	}
 };
 
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 96c2aaabbd6..93e0167454f 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -22,6 +22,7 @@
 #include "access/subtrans.h"
 #include "access/syncscan.h"
 #include "access/twophase.h"
+#include "access/xlogrestore.h"
 #include "commands/async.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -149,6 +150,7 @@ CreateSharedMemoryAndSemaphores(void)
 		size = add_size(size, BTreeShmemSize());
 		size = add_size(size, SyncScanShmemSize());
 		size = add_size(size, AsyncShmemSize());
+		size = add_size(size, RestoreCommandShmemSize());
 #ifdef EXEC_BACKEND
 		size = add_size(size, ShmemBackendArraySize());
 #endif
@@ -259,6 +261,7 @@ CreateSharedMemoryAndSemaphores(void)
 	WalSndShmemInit();
 	WalRcvShmemInit();
 	ApplyLauncherShmemInit();
+	RestoreCommandShmemInit();
 
 	/*
 	 * Set up other modules that need some shared memory space
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 02d2d267b5c..8e62bbe3014 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -37,6 +37,7 @@
 #include "access/twophase.h"
 #include "access/xact.h"
 #include "access/xlog_internal.h"
+#include "access/xlogrestore.h"
 #include "catalog/namespace.h"
 #include "catalog/pg_authid.h"
 #include "catalog/storage.h"
@@ -225,6 +226,8 @@ static bool check_recovery_target_lsn(char **newval, void **extra, GucSource sou
 static void assign_recovery_target_lsn(const char *newval, void *extra);
 static bool check_primary_slot_name(char **newval, void **extra, GucSource source);
 static bool check_default_with_oids(bool *newval, void **extra, GucSource source);
+static bool check_wal_prefetch_workers(int *newval, void **extra,
+									   GucSource source);
 
 /* Private functions in guc-file.l that need to be called from guc.c */
 static ConfigVariable *ProcessConfigFileInternal(GucContext context,
@@ -3399,13 +3402,38 @@ static struct config_int ConfigureNamesInt[] =
 		check_huge_page_size, NULL, NULL
 	},
 
+	{
+		{"wal_max_prefetch_amount",
+			PGC_POSTMASTER,
+			WAL_ARCHIVE_RECOVERY,
+			gettext_noop("Set a max number of WAL files to keep up prefetched "
+						 "from archive"),
+			NULL
+		},
+		&wal_max_prefetch_amount,
+		DEFAULT_WAL_MAX_PREFETCH_AMOUNT, 0, INT_MAX,
+		NULL, NULL, NULL
+	},
+
+	{
+		{"wal_prefetch_workers",
+			PGC_POSTMASTER,
+			WAL_ARCHIVE_RECOVERY,
+			gettext_noop("Set a number of background workers to run for "
+						 "prefetching WAL files from archive"),
+			NULL
+		},
+		&wal_prefetch_workers,
+		DEFAULT_TOTAL_PREFETCH_WORKERS, 0, MAX_BACKENDS,
+		check_wal_prefetch_workers, NULL, NULL
+	},
+
 	/* End-of-list marker */
 	{
 		{NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL, NULL
 	}
 };
 
-
 static struct config_real ConfigureNamesReal[] =
 {
 	{
@@ -11662,6 +11690,20 @@ check_max_worker_processes(int *newval, void **extra, GucSource source)
 	return true;
 }
 
+static bool
+check_wal_prefetch_workers(int *newval, void **extra, GucSource source)
+{
+	if (*newval > max_worker_processes)
+	{
+		GUC_check_errdetail("A value of wal_prefetch_workers can't exceed "
+							"a value of max_worker_processes=%d",
+							max_worker_processes);
+		return false;
+	}
+
+	return true;
+}
+
 static bool
 check_effective_io_concurrency(int *newval, void **extra, GucSource source)
 {
diff --git a/src/include/access/xlogrestore.h b/src/include/access/xlogrestore.h
new file mode 100644
index 00000000000..c657dd8c6e2
--- /dev/null
+++ b/src/include/access/xlogrestore.h
@@ -0,0 +1,43 @@
+/*-------------------------------------------------------------------------
+ *
+ * xlogrestore.h
+ *		Prototypes and definitions for parallel restore commands execution
+ *
+ * Copyright (c) 2020, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		src/include/access/xlogrestore.h
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#ifndef XLOGRESTORE_H
+#define XLOGRESTORE_H
+
+#include "postgres.h"
+
+/* GUC variables */
+extern int	wal_max_prefetch_amount;
+extern int	wal_prefetch_workers;
+
+/*
+ * Default max number of WAL files for pre-fetching from archive.
+ * Zero value means that WAL files prefetching is turned off by default.
+ */
+#define DEFAULT_WAL_MAX_PREFETCH_AMOUNT	0
+
+/*
+ * Default value for a number of prefetch workers spawned by postmaster
+ * on server startup for database recovering from archive.
+ */
+#define DEFAULT_TOTAL_PREFETCH_WORKERS 2
+
+extern Size RestoreCommandShmemSize(void);
+extern void RestoreCommandShmemInit(void);
+extern bool RestoreCommandXLog(char *path, const char *xlogfname,
+							   const char *recovername,
+							   const off_t expectedSize,
+							   bool cleanupEnabled);
+extern void WALPrefetchWorkerMain(Datum main_arg) pg_attribute_noreturn();
+
+#endif							/* XLOGRESTORE_H */
diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h
index e59b6cf3a9f..98078518e88 100644
--- a/src/include/access/xlogutils.h
+++ b/src/include/access/xlogutils.h
@@ -60,4 +60,13 @@ extern void XLogReadDetermineTimeline(XLogReaderState *state,
 
 extern void WALReadRaiseError(WALReadError *errinfo);
 
+extern void FileUnlink(const char *xlogpath);
+extern void XLogFileNameLastPoint(char *lastRestartPointFname,
+								  bool cleanupEnabled);
+extern bool FileValidateSize(char const *xlogpath, off_t expectedSize,
+							 char const *xlogfname, bool *file_not_found);
+
+extern int	DoRestore(char const *xlogpath, char const *xlogfname,
+					  char const *pointfname);
+
 #endif
diff --git a/src/test/recovery/t/021_xlogrestore.pl b/src/test/recovery/t/021_xlogrestore.pl
new file mode 100644
index 00000000000..df5d573a4a7
--- /dev/null
+++ b/src/test/recovery/t/021_xlogrestore.pl
@@ -0,0 +1,138 @@
+#
+# Test for xlogrestore with wal_max_prefetch_amount parameter
+#
+
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 9;
+
+sub measure_replica_restore_time
+{
+	my ( $replica_name, $node_primary, $backup_name, $last_lsn, $tab_int_count, $config ) = @_;
+	my $timer = time();
+
+	# Initialize replica node from backup, fetching WAL from archives
+	my $node_replica = get_new_node( $replica_name );
+	$node_replica->init_from_backup( $node_primary, $backup_name,
+		has_restoring => 1 );
+	$node_replica->append_conf( 'postgresql.conf', $config );
+	$node_replica->start();
+
+	# Wait until necessary replay has been done on replica
+	my $caughtup_query =
+	  "SELECT '$last_lsn'::pg_lsn <= pg_last_wal_replay_lsn()";
+	$node_replica->poll_query_until( 'postgres', $caughtup_query )
+	  or die "Timed out while waiting for replica to catch up";
+
+	# Check tab_int's rows count
+	my $replica_tab_int_count =
+	  $node_replica->safe_psql( 'postgres', "SELECT count(*) FROM tab_int" );
+	is( $replica_tab_int_count, $tab_int_count, 'tab_int sizes are equal' );
+
+	# Check the presence of temporary files specifically generated during
+	# archive recovery.
+	$node_replica->promote();
+
+	my $node_replica_data = $node_replica->data_dir;
+	ok( !-f "$node_replica_data/pg_wal/RECOVERYHISTORY",
+		"RECOVERYHISTORY removed after promotion");
+	ok( !-f "$node_replica_data/pg_wal/RECOVERYXLOG",
+			"RECOVERYXLOG removed after promotion");
+	ok( !-d "$node_replica_data/pg_wal/prefetch",
+		"pg_wal/prefetch dir removed after promotion");
+
+	my $res = time() - $timer;
+
+	$node_replica->stop();
+	return $res;
+}
+
+# WAL produced count
+my $wal_count = 64;
+
+# Size of data portion
+my $wal_data_portion = 128;
+
+# Sleep to imitate restore delays
+my $restore_sleep = 0.256;
+
+# Initialize primary node, doing archives
+my $node_primary = get_new_node( 'primary' );
+$node_primary->init(
+	has_archiving    => 1,
+	allows_streaming => 1
+);
+
+# Start it
+$node_primary->start;
+
+# Take backup for replica.
+my $backup_name = 'my_backup';
+$node_primary->backup( $backup_name );
+
+# Create some content on primary server that won't be present on replicas.
+for ( my $i = 0; $i < $wal_count; $i++ )
+{
+	if ( $i == 0 ) {
+		$node_primary->safe_psql('postgres',
+			"CREATE TABLE tab_int ( a SERIAL NOT NULL PRIMARY KEY );")
+	} else {
+		$node_primary->safe_psql('postgres',
+			"INSERT INTO tab_int SELECT FROM generate_series( 1, $wal_data_portion );")
+	}
+	$node_primary->safe_psql('postgres', "SELECT pg_switch_wal()");
+}
+
+my $last_lsn = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn();");
+my $tab_int_count = $node_primary->safe_psql('postgres', "SELECT count(*) FROM tab_int;");
+
+$node_primary->stop();
+
+#	Restore command
+my $restore_command;
+my $path = TestLib::perl2host( $node_primary->archive_dir );
+if ( $TestLib::windows_os ) {
+	$path =~ s{\\}{\\\\}g;
+	$restore_command = qq(perl -e "select( undef, undef, undef, $restore_sleep );" & copy "$path\\\\%f" "%p);
+} else {
+	$restore_command = qq(sleep ) . $restore_sleep . qq( && cp "$path/%f" "%p");
+}
+
+# DEBUG: Don't forget remove it
+diag("restore_command=$restore_command");
+
+# Compare the replica restore times with different max_prefetch_workers value.
+diag('Run database restoring with prefetching of WAL files');
+my $multiple_workers_restore_time = measure_replica_restore_time(
+	'fast_restored_replica',
+	$node_primary,
+	$backup_name,
+	$last_lsn,
+	$tab_int_count,
+qq(
+wal_retrieve_retry_interval = '100ms'
+wal_max_prefetch_amount = 8
+wal_prefetch_workers = 4
+restore_command = '$restore_command'
+log_min_messages = INFO
+));
+
+diag('Run database restoring in regular way without prefetching of WAL files');
+my $single_worker_restore_time = measure_replica_restore_time(
+	'normal_restored_replica',
+	$node_primary,
+	$backup_name,
+	$last_lsn,
+	$tab_int_count,
+qq(
+wal_retrieve_retry_interval = '100ms'
+wal_max_prefetch_amount = 0
+restore_command = '$restore_command'
+));
+
+diag("multiple_workers_restore_time = $multiple_workers_restore_time");
+diag("single_worker_restore_time = $single_worker_restore_time");
+
+ok( $multiple_workers_restore_time < $single_worker_restore_time, "Multiple workers are faster than a single worker" );
diff --git a/src/test/regress/expected/guc.out b/src/test/regress/expected/guc.out
index 811f80a0976..8283816e3c8 100644
--- a/src/test/regress/expected/guc.out
+++ b/src/test/regress/expected/guc.out
@@ -776,3 +776,24 @@ set default_with_oids to f;
 -- Should not allow to set it to true.
 set default_with_oids to t;
 ERROR:  tables declared WITH OIDS are not supported
+--
+-- Check that a value for the new configration parameter
+-- wal_prefetch_workers is limited by a value of the parameter
+-- max_worker_processes
+SHOW max_worker_processes;
+ max_worker_processes 
+----------------------
+ 8
+(1 row)
+
+-- max_worker_processes has default value 8
+-- Check that an attempt to set the parameter wal_prefetch_workers
+-- to a value execeeding this limit results in error
+ALTER SYSTEM SET wal_prefetch_workers = 16; -- fails, it is expected behaviour
+ERROR:  invalid value for parameter "wal_prefetch_workers": 16
+DETAIL:  A value of wal_prefetch_workers can't exceed a value of max_worker_processes=8
+-- Check that a value lesser than max_worker_processes can be assigned
+-- to the parameter wal_prefetch_workers
+ALTER SYSTEM SET wal_prefetch_workers = 7; -- ok since 7 < max_worker_processes
+-- Reset to default
+ALTER SYSTEM RESET wal_prefetch_workers;
diff --git a/src/test/regress/sql/guc.sql b/src/test/regress/sql/guc.sql
index 43dbba3775e..d9215da9ee3 100644
--- a/src/test/regress/sql/guc.sql
+++ b/src/test/regress/sql/guc.sql
@@ -296,3 +296,22 @@ reset check_function_bodies;
 set default_with_oids to f;
 -- Should not allow to set it to true.
 set default_with_oids to t;
+
+--
+-- Check that a value for the new configration parameter
+-- wal_prefetch_workers is limited by a value of the parameter
+-- max_worker_processes
+SHOW max_worker_processes;
+
+-- max_worker_processes has default value 8
+-- Check that an attempt to set the parameter wal_prefetch_workers
+-- to a value execeeding this limit results in error
+
+ALTER SYSTEM SET wal_prefetch_workers = 16; -- fails, it is expected behaviour
+
+-- Check that a value lesser than max_worker_processes can be assigned
+-- to the parameter wal_prefetch_workers
+ALTER SYSTEM SET wal_prefetch_workers = 7; -- ok since 7 < max_worker_processes
+
+-- Reset to default
+ALTER SYSTEM RESET wal_prefetch_workers;
-- 
2.24.3 (Apple Git-128)

Reply via email to