On 22.06.2018 11:35, Konstantin Knizhnik wrote:
On 21.06.2018 19:57, Tomas Vondra wrote:
On 06/21/2018 04:01 PM, Konstantin Knizhnik wrote:
I continue my experiments with WAL prefetch.
I have embedded prefetch in Postgres: now walprefetcher is started
together with startup process and is able to help it to speedup
recovery.
The patch is attached.
Unfortunately result is negative (at least at my desktop: SSD, 16Gb
RAM). Recovery with prefetch is 3 times slower than without it.
What I am doing:
Configuration:
max_wal_size=min_wal_size=10Gb,
shared)buffers = 1Gb
Database:
pgbench -i -s 1000
Test:
pgbench -c 10 -M prepared -N -T 100 -P 1
pkill postgres
echo 3 > /proc/sys/vm/drop_caches
time pg_ctl -t 1000 -D pgsql -l logfile start
Without prefetch it is 19 seconds (recovered about 4Gb of WAL), with
prefetch it is about one minute. About 400k blocks are prefetched.
CPU usage is small (<20%), both processes as in "Ds" state.
Based on a quick test, my guess is that the patch is broken in
several ways. Firstly, with the patch attached (and
wal_prefetch_enabled=on, which I think is needed to enable the
prefetch) I can't even restart the server, because pg_ctl restart
just hangs (the walprefetcher process gets stuck in WaitForWAL, IIRC).
I have added an elog(LOG,...) to walprefetcher.c, right before the
FilePrefetch call, and (a) I don't see any actual prefetch calls
during recovery but (b) I do see the prefetch happening during the
pgbench. That seems a bit ... wrong?
Furthermore, you've added an extra
signal_child(BgWriterPID, SIGHUP);
to SIGHUP_handler, which seems like a bug too. I don't have time to
investigate/debug this further.
regards
Sorry, updated version of the patch is attached.
Please also notice that you can check number of prefetched pages using
pg_stat_activity() - it is reported for walprefetcher process.
Concerning the fact that you have no see prefetches at recovery time:
please check that min_wal_size and max_wal_size are large enough and
pgbench (or whatever else)
committed large enough changes so that recovery will take some time.
I have improved my WAL prefetch patch. The main reason of slowdown
recovery speed with enabled prefetch was that it doesn't take in account
initialized pages (XLOG_HEAP_INIT_PAGE)
and doesn't remember (cache) full page writes.
The main differences of new version of the patch:
1. Use effective_cache_size as size of cache of prefetched blocks
2. Do not prefetch blocks sent in shared buffers
3. Do not prefetch blocks for RM_HEAP_ID with XLOG_HEAP_INIT_PAGE bit set
4. Remember new/fpw pages in prefetch cache, to avoid prefetch them for
subsequent WAL records.
5. Add min/max prefetch lead parameters to make it possible to
synchronize speed of prefetch with speed of replay.
6. Increase size of open file cache to avoid redundant open/close
operations.
--
Konstantin Knizhnik
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 1b000a2..9730b42 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -879,13 +879,6 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
return true;
}
-#ifdef FRONTEND
-/*
- * Functions that are currently not needed in the backend, but are better
- * implemented inside xlogreader.c because of the internal facilities available
- * here.
- */
-
/*
* Find the first record with an lsn >= RecPtr.
*
@@ -1004,9 +997,6 @@ out:
return found;
}
-#endif /* FRONTEND */
-
-
/* ----------------------------------------
* Functions for decoding the data and block references in a record.
* ----------------------------------------
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 52fe55e..7847311 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -652,7 +652,7 @@ XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum,
* in walsender.c but for small differences (such as lack of elog() in
* frontend). Probably these should be merged at some point.
*/
-static void
+void
XLogRead(char *buf, int segsize, TimeLineID tli, XLogRecPtr startptr,
Size count)
{
diff --git a/src/backend/bootstrap/bootstrap.c b/src/backend/bootstrap/bootstrap.c
index 7e34bee..e492715 100644
--- a/src/backend/bootstrap/bootstrap.c
+++ b/src/backend/bootstrap/bootstrap.c
@@ -32,6 +32,7 @@
#include "postmaster/bgwriter.h"
#include "postmaster/startup.h"
#include "postmaster/walwriter.h"
+#include "postmaster/walprefetcher.h"
#include "replication/walreceiver.h"
#include "storage/bufmgr.h"
#include "storage/bufpage.h"
@@ -335,6 +336,9 @@ AuxiliaryProcessMain(int argc, char *argv[])
case WalReceiverProcess:
statmsg = pgstat_get_backend_desc(B_WAL_RECEIVER);
break;
+ case WalPrefetcherProcess:
+ statmsg = pgstat_get_backend_desc(B_WAL_PREFETCHER);
+ break;
default:
statmsg = "??? process";
break;
@@ -462,6 +466,11 @@ AuxiliaryProcessMain(int argc, char *argv[])
WalReceiverMain();
proc_exit(1); /* should never return */
+ case WalPrefetcherProcess:
+ /* don't set signals, walprefetcher has its own agenda */
+ WalPrefetcherMain();
+ proc_exit(1); /* should never return */
+
default:
elog(PANIC, "unrecognized process type: %d", (int) MyAuxProcType);
proc_exit(1);
diff --git a/src/backend/postmaster/Makefile b/src/backend/postmaster/Makefile
index 71c2321..13e5066 100644
--- a/src/backend/postmaster/Makefile
+++ b/src/backend/postmaster/Makefile
@@ -13,6 +13,6 @@ top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
OBJS = autovacuum.o bgworker.o bgwriter.o checkpointer.o fork_process.o \
- pgarch.o pgstat.o postmaster.o startup.o syslogger.o walwriter.o
+ pgarch.o pgstat.o postmaster.o startup.o syslogger.o walwriter.o walprefetcher.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 084573e..7195578 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -2870,6 +2870,9 @@ pgstat_bestart(void)
case WalReceiverProcess:
beentry->st_backendType = B_WAL_RECEIVER;
break;
+ case WalPrefetcherProcess:
+ beentry->st_backendType = B_WAL_PREFETCHER;
+ break;
default:
elog(FATAL, "unrecognized process type: %d",
(int) MyAuxProcType);
@@ -3519,6 +3522,9 @@ pgstat_get_wait_activity(WaitEventActivity w)
case WAIT_EVENT_WAL_WRITER_MAIN:
event_name = "WalWriterMain";
break;
+ case WAIT_EVENT_WAL_PREFETCHER_MAIN:
+ event_name = "WalPrefetcherMain";
+ break;
/* no default case, so that compiler will warn */
}
@@ -4126,6 +4132,9 @@ pgstat_get_backend_desc(BackendType backendType)
case B_WAL_RECEIVER:
backendDesc = "walreceiver";
break;
+ case B_WAL_PREFETCHER:
+ backendDesc = "walprefetcher";
+ break;
case B_WAL_SENDER:
backendDesc = "walsender";
break;
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index a4b53b3..1f3598d 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -254,7 +254,9 @@ static pid_t StartupPID = 0,
AutoVacPID = 0,
PgArchPID = 0,
PgStatPID = 0,
- SysLoggerPID = 0;
+ SysLoggerPID = 0,
+ WalPrefetcherPID = 0
+;
/* Startup process's status */
typedef enum
@@ -362,6 +364,9 @@ static volatile bool avlauncher_needs_signal = false;
/* received START_WALRECEIVER signal */
static volatile sig_atomic_t WalReceiverRequested = false;
+/* received START_WALPREFETCHER signal */
+static volatile sig_atomic_t WalPrefetcherRequested = false;
+
/* set when there's a worker that needs to be started up */
static volatile bool StartWorkerNeeded = true;
static volatile bool HaveCrashedWorker = false;
@@ -549,6 +554,7 @@ static void ShmemBackendArrayRemove(Backend *bn);
#define StartupDataBase() StartChildProcess(StartupProcess)
#define StartBackgroundWriter() StartChildProcess(BgWriterProcess)
#define StartCheckpointer() StartChildProcess(CheckpointerProcess)
+#define StartWalPrefetcher() StartChildProcess(WalPrefetcherProcess)
#define StartWalWriter() StartChildProcess(WalWriterProcess)
#define StartWalReceiver() StartChildProcess(WalReceiverProcess)
@@ -1373,6 +1379,9 @@ PostmasterMain(int argc, char *argv[])
StartupStatus = STARTUP_RUNNING;
pmState = PM_STARTUP;
+ /* Start Wal prefetcher now because it may speed-up WAL redo */
+ WalPrefetcherPID = StartWalPrefetcher();
+
/* Some workers may be scheduled to start now */
maybe_start_bgworkers();
@@ -2535,6 +2544,8 @@ SIGHUP_handler(SIGNAL_ARGS)
signal_child(BgWriterPID, SIGHUP);
if (CheckpointerPID != 0)
signal_child(CheckpointerPID, SIGHUP);
+ if (WalPrefetcherPID != 0)
+ signal_child(WalPrefetcherPID, SIGHUP);
if (WalWriterPID != 0)
signal_child(WalWriterPID, SIGHUP);
if (WalReceiverPID != 0)
@@ -2685,6 +2696,8 @@ pmdie(SIGNAL_ARGS)
signal_child(BgWriterPID, SIGTERM);
if (WalReceiverPID != 0)
signal_child(WalReceiverPID, SIGTERM);
+ if (WalPrefetcherPID != 0)
+ signal_child(WalPrefetcherPID, SIGTERM);
if (pmState == PM_RECOVERY)
{
SignalSomeChildren(SIGTERM, BACKEND_TYPE_BGWORKER);
@@ -2864,6 +2877,8 @@ reaper(SIGNAL_ARGS)
*/
if (CheckpointerPID == 0)
CheckpointerPID = StartCheckpointer();
+ if (WalPrefetcherPID == 0)
+ WalPrefetcherPID = StartWalPrefetcher();
if (BgWriterPID == 0)
BgWriterPID = StartBackgroundWriter();
if (WalWriterPID == 0)
@@ -2967,6 +2982,20 @@ reaper(SIGNAL_ARGS)
}
/*
+ * Was it the wal prefetcher? Normal exit can be ignored; we'll start a
+ * new one at the next iteration of the postmaster's main loop, if
+ * necessary. Any other exit condition is treated as a crash.
+ */
+ if (pid == WalPrefetcherPID)
+ {
+ WalPrefetcherPID = 0;
+ if (!EXIT_STATUS_0(exitstatus))
+ HandleChildCrash(pid, exitstatus,
+ _("WAL prefetcher process"));
+ continue;
+ }
+
+ /*
* Was it the wal writer? Normal exit can be ignored; we'll start a
* new one at the next iteration of the postmaster's main loop, if
* necessary. Any other exit condition is treated as a crash.
@@ -3451,6 +3480,18 @@ HandleChildCrash(int pid, int exitstatus, const char *procname)
signal_child(WalWriterPID, (SendStop ? SIGSTOP : SIGQUIT));
}
+ /* Take care of the walprefetcherr too */
+ if (pid == WalPrefetcherPID)
+ WalPrefetcherPID = 0;
+ else if (WalPrefetcherPID != 0 && take_action)
+ {
+ ereport(DEBUG2,
+ (errmsg_internal("sending %s to process %d",
+ (SendStop ? "SIGSTOP" : "SIGQUIT"),
+ (int) WalPrefetcherPID)));
+ signal_child(WalPrefetcherPID, (SendStop ? SIGSTOP : SIGQUIT));
+ }
+
/* Take care of the walreceiver too */
if (pid == WalReceiverPID)
WalReceiverPID = 0;
@@ -3657,6 +3698,7 @@ PostmasterStateMachine(void)
if (CountChildren(BACKEND_TYPE_NORMAL | BACKEND_TYPE_WORKER) == 0 &&
StartupPID == 0 &&
WalReceiverPID == 0 &&
+ WalPrefetcherPID == 0 &&
BgWriterPID == 0 &&
(CheckpointerPID == 0 ||
(!FatalError && Shutdown < ImmediateShutdown)) &&
@@ -3757,6 +3799,7 @@ PostmasterStateMachine(void)
Assert(WalReceiverPID == 0);
Assert(BgWriterPID == 0);
Assert(CheckpointerPID == 0);
+ Assert(WalPrefetcherPID == 0);
Assert(WalWriterPID == 0);
Assert(AutoVacPID == 0);
/* syslogger is not considered here */
@@ -3946,6 +3989,8 @@ TerminateChildren(int signal)
signal_child(WalWriterPID, signal);
if (WalReceiverPID != 0)
signal_child(WalReceiverPID, signal);
+ if (WalPrefetcherPID != 0)
+ signal_child(WalPrefetcherPID, signal);
if (AutoVacPID != 0)
signal_child(AutoVacPID, signal);
if (PgArchPID != 0)
@@ -5041,6 +5086,10 @@ sigusr1_handler(SIGNAL_ARGS)
Assert(BgWriterPID == 0);
BgWriterPID = StartBackgroundWriter();
+ /* WAL prefetcher is expected to be started earlier but if not, try to start it now */
+ if (WalPrefetcherPID == 0)
+ WalPrefetcherPID = StartWalPrefetcher();
+
/*
* Start the archiver if we're responsible for (re-)archiving received
* files.
@@ -5361,6 +5410,10 @@ StartChildProcess(AuxProcType type)
ereport(LOG,
(errmsg("could not fork WAL receiver process: %m")));
break;
+ case WalPrefetcherProcess:
+ ereport(LOG,
+ (errmsg("could not fork WAL prefetcher process: %m")));
+ break;
default:
ereport(LOG,
(errmsg("could not fork process: %m")));
diff --git a/src/backend/postmaster/walprefetcher.c b/src/backend/postmaster/walprefetcher.c
new file mode 100644
index 0000000..3c8beba
--- /dev/null
+++ b/src/backend/postmaster/walprefetcher.c
@@ -0,0 +1,648 @@
+/*-------------------------------------------------------------------------
+ *
+ * walprefetcher.c
+ *
+ * Replaying WAL is done by single process, it may cause slow recovery time
+ * cause lag between master and replica.
+ *
+ * Prefetcher trieds to preload in OS file cache blocks, referenced by WAL
+ * records to speedup recovery
+ *
+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/postmaster/walprefetcher.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <signal.h>
+
+#include "access/heapam_xlog.h"
+#include "access/transam.h"
+#include "access/xact.h"
+#include "access/xlog.h"
+#include "access/xlog_internal.h"
+#include "access/xloginsert.h"
+#include "access/xlogutils.h"
+#include "access/xlogreader.h"
+#include "access/xlogrecord.h"
+#include "libpq/pqsignal.h"
+#include "miscadmin.h"
+#include "optimizer/cost.h"
+#include "pgstat.h"
+#include "portability/instr_time.h"
+#include "postmaster/walprefetcher.h"
+#include "replication/walreceiver.h"
+#include "storage/fd.h"
+#include "storage/ipc.h"
+#include "storage/proc.h"
+#include "storage/buf_internals.h"
+#include "utils/guc.h"
+#include "utils/pg_lsn.h"
+#include "utils/memutils.h"
+
+#define KB (1024LL)
+/* #define DEBUG_PREFETCH 1 */
+
+#if DEBUG_PREFETCH
+#define LOG_LEVEL LOG
+#else
+#define LOG_LEVEL DEBUG1
+#endif
+
+/*
+ * GUC parameters
+ */
+int WalPrefetchMinLead = 0;
+int WalPrefetchMaxLead = 0;
+int WalPrefetchPollInterval = 1000;
+bool WalPrefetchEnabled = false;
+
+/*
+ * Flags set by interrupt handlers for later service in the main loop.
+ */
+static volatile sig_atomic_t got_SIGHUP = false;
+static volatile sig_atomic_t shutdown_requested = false;
+
+/* Signal handlers */
+static void WpfQuickDie(SIGNAL_ARGS);
+static void WpfSigHupHandler(SIGNAL_ARGS);
+static void WpfShutdownHandler(SIGNAL_ARGS);
+static void WpfSigusr1Handler(SIGNAL_ARGS);
+
+/*
+ * Main entry point for walprefetcher background worker
+ */
+void
+WalPrefetcherMain()
+{
+ sigjmp_buf local_sigjmp_buf;
+ MemoryContext walprefetcher_context;
+ int rc;
+
+ pqsignal(SIGHUP, WpfSigHupHandler); /* set flag to read config file */
+ pqsignal(SIGINT, WpfShutdownHandler); /* request shutdown */
+ pqsignal(SIGTERM, WpfShutdownHandler); /* request shutdown */
+ pqsignal(SIGQUIT, WpfQuickDie); /* hard crash time */
+ pqsignal(SIGALRM, SIG_IGN);
+ pqsignal(SIGPIPE, SIG_IGN);
+ pqsignal(SIGUSR1, WpfSigusr1Handler);
+ pqsignal(SIGUSR2, SIG_IGN); /* not used */
+
+ /*
+ * Reset some signals that are accepted by postmaster but not here
+ */
+ pqsignal(SIGCHLD, SIG_DFL);
+ pqsignal(SIGTTIN, SIG_DFL);
+ pqsignal(SIGTTOU, SIG_DFL);
+ pqsignal(SIGCONT, SIG_DFL);
+ pqsignal(SIGWINCH, SIG_DFL);
+
+ /* We allow SIGQUIT (quickdie) at all times */
+ sigdelset(&BlockSig, SIGQUIT);
+
+ /*
+ * Create a memory context that we will do all our work in. We do this so
+ * that we can reset the context during error recovery and thereby avoid
+ * possible memory leaks. Formerly this code just ran in
+ * TopMemoryContext, but resetting that would be a really bad idea.
+ */
+ walprefetcher_context = AllocSetContextCreate(TopMemoryContext,
+ "Wal Prefetcher",
+ ALLOCSET_DEFAULT_SIZES);
+ MemoryContextSwitchTo(walprefetcher_context);
+
+ /*
+ * If an exception is encountered, processing resumes here.
+ *
+ * This code is heavily based on bgwriter.c, q.v.
+ */
+ if (sigsetjmp(local_sigjmp_buf, 1) != 0)
+ {
+ /* Since not using PG_TRY, must reset error stack by hand */
+ error_context_stack = NULL;
+
+ /* Prevent interrupts while cleaning up */
+ HOLD_INTERRUPTS();
+
+ /* Report the error to the server log */
+ EmitErrorReport();
+
+ pgstat_report_wait_end();
+ AtEOXact_Files(false);
+
+ /*
+ * Now return to normal top-level context and clear ErrorContext for
+ * next time.
+ */
+ MemoryContextSwitchTo(walprefetcher_context);
+ FlushErrorState();
+
+ /* Flush any leaked data in the top-level context */
+ MemoryContextResetAndDeleteChildren(walprefetcher_context);
+
+ /* Now we can allow interrupts again */
+ RESUME_INTERRUPTS();
+
+ /*
+ * Sleep at least 1 second after any error. A write error is likely
+ * to be repeated, and we don't want to be filling the error logs as
+ * fast as we can.
+ */
+ pg_usleep(1000000L);
+ }
+
+ /* We can now handle ereport(ERROR) */
+ PG_exception_stack = &local_sigjmp_buf;
+
+ /*
+ * Unblock signals (they were blocked when the postmaster forked us)
+ */
+ PG_SETMASK(&UnBlockSig);
+
+ /*
+ * Loop forever
+ */
+ for (;;)
+ {
+ /* Clear any already-pending wakeups */
+ ResetLatch(MyLatch);
+
+ /*
+ * Process any requests or signals received recently.
+ */
+ if (got_SIGHUP)
+ {
+ got_SIGHUP = false;
+ ProcessConfigFile(PGC_SIGHUP);
+ }
+ if (shutdown_requested)
+ {
+ /* Normal exit from the walprefetcher is here */
+ proc_exit(0); /* done */
+ }
+
+ if (WalPrefetchEnabled)
+ WalPrefetch(InvalidXLogRecPtr);
+
+ /*
+ * Sleep until we are signaled
+ */
+ rc = WaitLatch(MyLatch,
+ WL_LATCH_SET | WL_POSTMASTER_DEATH,
+ -1,
+ WAIT_EVENT_WAL_PREFETCHER_MAIN);
+
+ /*
+ * Emergency bailout if postmaster has died. This is to avoid the
+ * necessity for manual cleanup of all postmaster children.
+ */
+ if (rc & WL_POSTMASTER_DEATH)
+ exit(1);
+ }
+}
+
+
+/* --------------------------------
+ * signal handler routines
+ * --------------------------------
+ */
+
+/*
+ * WpfQuickDie() occurs when signalled SIGQUIT by the postmaster.
+ *
+ * Some backend has bought the farm,
+ * so we need to stop what we're doing and exit.
+ */
+static void
+WpfQuickDie(SIGNAL_ARGS)
+{
+ PG_SETMASK(&BlockSig);
+
+ /*
+ * We DO NOT want to run proc_exit() callbacks -- we're here because
+ * shared memory may be corrupted, so we don't want to try to clean up our
+ * transaction. Just nail the windows shut and get out of town. Now that
+ * there's an atexit callback to prevent third-party code from breaking
+ * things by calling exit() directly, we have to reset the callbacks
+ * explicitly to make this work as intended.
+ */
+ on_exit_reset();
+
+ /*
+ * Note we do exit(2) not exit(0). This is to force the postmaster into a
+ * system reset cycle if some idiot DBA sends a manual SIGQUIT to a random
+ * backend. This is necessary precisely because we don't clean up our
+ * shared memory state. (The "dead man switch" mechanism in pmsignal.c
+ * should ensure the postmaster sees this as a crash, too, but no harm in
+ * being doubly sure.)
+ */
+ exit(2);
+}
+
+/* SIGHUP: set flag to re-read config file at next convenient time */
+static void
+WpfSigHupHandler(SIGNAL_ARGS)
+{
+ got_SIGHUP = true;
+ SetLatch(MyLatch);
+}
+
+/* SIGTERM: set flag to exit normally */
+static void
+WpfShutdownHandler(SIGNAL_ARGS)
+{
+ shutdown_requested = true;
+ SetLatch(MyLatch);
+}
+
+/* SIGUSR1: used for latch wakeups */
+static void
+WpfSigusr1Handler(SIGNAL_ARGS)
+{
+ latch_sigusr1_handler();
+}
+
+/*
+ * Now wal prefetch code itself.
+ */
+static int
+WalReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr,
+ int reqLen, XLogRecPtr targetRecPtr, char *cur_page,
+ TimeLineID *pageTLI);
+
+#define FILE_HASH_SIZE 1009 /* Size of opened files hash */
+#define STAT_REFRESH_PERIOD 1024 /* Refresh backend status rate */
+
+/*
+ * Block LRU hash table is used to keep information about most recently prefetched blocks.
+ */
+typedef struct BlockHashEntry
+{
+ struct BlockHashEntry* next;
+ struct BlockHashEntry* prev;
+ struct BlockHashEntry* collision;
+ BufferTag tag;
+ uint32 hash;
+} BlockHashEntry;
+
+static BlockHashEntry** block_hash_table;
+static size_t block_hash_size;
+static size_t block_hash_used;
+static BlockHashEntry lru = {&lru, &lru};
+static TimeLineID replay_timeline;
+
+/*
+ * Yet another L2-list implementation
+ */
+static void
+unlink_block(BlockHashEntry* entry)
+{
+ entry->next->prev = entry->prev;
+ entry->prev->next = entry->next;
+}
+
+static void
+link_block_after(BlockHashEntry* head, BlockHashEntry* entry)
+{
+ entry->next = head->next;
+ entry->prev = head;
+ head->next->prev = entry;
+ head->next = entry;
+}
+
+/*
+ * Put block in LRU hash or link it to the head of LRU list. Returns true if block was not present in hash, false otherwise.
+ */
+static bool
+put_block_in_cache(BufferTag* tag)
+{
+ uint32 hash;
+ BlockHashEntry* entry;
+
+ hash = BufTableHashCode(tag) % block_hash_size;
+ for (entry = block_hash_table[hash]; entry != NULL; entry = entry->collision)
+ {
+ if (BUFFERTAGS_EQUAL(entry->tag, *tag))
+ {
+ unlink_block(entry);
+ link_block_after(&lru, entry);
+ return false;
+ }
+ }
+ if (block_hash_size == block_hash_used)
+ {
+ BlockHashEntry* victim = lru.prev;
+ BlockHashEntry** epp = &block_hash_table[victim->hash];
+ while (*epp != victim)
+ epp = &(*epp)->collision;
+ *epp = (*epp)->collision;
+ unlink_block(victim);
+ entry = victim;
+ }
+ else
+ {
+ entry = (BlockHashEntry*)palloc(sizeof(BlockHashEntry));
+ block_hash_used += 1;
+ }
+ entry->tag = *tag;
+ entry->hash = hash;
+ entry->collision = block_hash_table[hash];
+ block_hash_table[hash] = entry;
+ link_block_after(&lru, entry);
+
+ return true;
+}
+
+/*
+ * Hash of of opened files. It seems to be simpler to maintain own cache rather than provide SMgrRelation for smgr functions.
+ */
+typedef struct FileHashEntry
+{
+ BufferTag tag;
+ File file;
+} FileHashEntry;
+
+static FileHashEntry file_hash_table[FILE_HASH_SIZE];
+
+static File
+WalOpenFile(BufferTag* tag)
+{
+ BufferTag segment_tag = *tag;
+ uint32 hash;
+ char* path;
+ File file;
+
+ /* Transform block number into segment number */
+ segment_tag.blockNum /= RELSEG_SIZE;
+ hash = BufTableHashCode(&segment_tag) % FILE_HASH_SIZE;
+
+ if (BUFFERTAGS_EQUAL(file_hash_table[hash].tag, segment_tag))
+ return file_hash_table[hash].file;
+
+ path = relpathperm(tag->rnode, tag->forkNum);
+ if (segment_tag.blockNum > 0)
+ {
+ char* fullpath = psprintf("%s.%d", path, segment_tag.blockNum);
+ pfree(path);
+ path = fullpath;
+ }
+ file = PathNameOpenFile(path, O_RDONLY | PG_BINARY);
+
+ if (file >= 0)
+ {
+ elog(LOG_LEVEL, "WAL_PREFETCH: open file %s", path);
+ if (file_hash_table[hash].tag.rnode.dbNode != 0)
+ FileClose(file_hash_table[hash].file);
+
+ file_hash_table[hash].file = file;
+ file_hash_table[hash].tag = segment_tag;
+ }
+ pfree(path);
+ return file;
+}
+
+/*
+ * Our backend doesn't receive any notifications about WAL progress, so we have to use sleep
+ * to wait until requested information is available
+ */
+static void
+WalWaitWAL(void)
+{
+ int rc;
+ CHECK_FOR_INTERRUPTS();
+ rc = WaitLatch(MyLatch,
+ WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
+ WalPrefetchPollInterval,
+ WAIT_EVENT_WAL_PREFETCHER_MAIN);
+ /*
+ * Emergency bailout if postmaster has died. This is to avoid the
+ * necessity for manual cleanup of all postmaster children.
+ */
+ if (rc & WL_POSTMASTER_DEATH)
+ exit(1);
+
+}
+
+/*
+ * Main function: perform prefetch of blocks referenced by WAL records starting from given LSN or from WAL replay position if lsn=0
+ */
+void
+WalPrefetch(XLogRecPtr lsn)
+{
+ XLogReaderState *xlogreader;
+ long n_prefetched = 0;
+ long n_fpw = 0;
+ long n_cached= 0;
+ long n_initialized = 0;
+
+ /* Dirty hack: prevent recovery conflict */
+ MyPgXact->xmin = InvalidTransactionId;
+
+ memset(file_hash_table, 0, sizeof file_hash_table);
+
+ free(block_hash_table);
+ block_hash_size = effective_cache_size;
+ block_hash_table = (BlockHashEntry**)calloc(block_hash_size, sizeof(BlockHashEntry*));
+ block_hash_used = 0;
+
+ xlogreader = XLogReaderAllocate(wal_segment_size, &WalReadPage, NULL);
+
+ if (!xlogreader)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory"),
+ errdetail("Failed while allocating a WAL reading processor.")));
+
+ if (lsn == InvalidXLogRecPtr)
+ lsn = GetXLogReplayRecPtr(NULL); /* Start with replay LSN */
+
+ while (!shutdown_requested)
+ {
+ char *errormsg;
+ int block_id;
+ XLogRecPtr replay_lsn = GetXLogReplayRecPtr(&replay_timeline);
+ XLogRecord *record;
+
+ /*
+ * If current position is behind current replay LSN, then move it forward: we do not want to perform useless job and prefetch
+ * blocks for already processed WAL records
+ */
+ if (lsn != InvalidXLogRecPtr || replay_lsn + WalPrefetchMinLead*KB >= xlogreader->EndRecPtr)
+ {
+ XLogRecPtr prefetch_lsn = replay_lsn != InvalidXLogRecPtr
+ ? XLogFindNextRecord(xlogreader, Max(lsn, replay_lsn) + WalPrefetchMinLead*KB) : InvalidXLogRecPtr;
+ if (prefetch_lsn == InvalidXLogRecPtr)
+ {
+ elog(LOG_LEVEL, "WAL_PREFETCH: wait for new WAL records at LSN %llx: replay lsn %llx, prefetched %ld, cached %ld, fpw %ld, initialized %ld",
+ (long long)xlogreader->EndRecPtr, (long long)replay_lsn, n_prefetched, n_cached, n_fpw, n_initialized);
+ WalWaitWAL();
+ continue;
+ }
+ lsn = prefetch_lsn;
+ }
+ /*
+ * Now opposite check: if prefetch goes too far from replay position, then suspend it for a while
+ */
+ if (WalPrefetchMaxLead != 0 && replay_lsn + WalPrefetchMaxLead*KB < xlogreader->EndRecPtr)
+ {
+ elog(LOG_LEVEL, "WAL_PREFETCH: wait for recovery at LSN %llx, replay LSN %llx",
+ (long long)xlogreader->EndRecPtr, (long long)replay_lsn);
+ WalWaitWAL();
+ continue;
+ }
+
+ record = XLogReadRecord(xlogreader, lsn, &errormsg);
+
+ if (record != NULL)
+ {
+ lsn = InvalidXLogRecPtr; /* continue with next record */
+
+ /* Loop through blocks referenced by this WAL record */
+ for (block_id = 0; block_id <= xlogreader->max_block_id; block_id++)
+ {
+ BufferTag tag;
+ File file;
+
+ if (!XLogRecGetBlockTag(xlogreader, block_id, &tag.rnode, &tag.forkNum, &tag.blockNum))
+ continue;
+
+ /* Check if block already prefetched */
+ if (!put_block_in_cache(&tag))
+ continue;
+
+ /* Check if block is cached in shared buffers */
+ if (IsBlockCached(&tag))
+ {
+ n_cached += 1;
+ continue;
+ }
+
+ /* Do not prefetch full pages */
+ if (XLogRecHasBlockImage(xlogreader, block_id))
+ {
+ n_fpw += 1;
+ continue;
+ }
+
+ /* Ignore initialized pages */
+ if (XLogRecGetRmid(xlogreader) == RM_HEAP_ID
+ && (XLogRecGetInfo(xlogreader) & XLOG_HEAP_INIT_PAGE))
+ {
+ n_initialized += 1;
+ continue;
+ }
+
+ file = WalOpenFile(&tag);
+ if (file >= 0)
+ {
+ off_t offs = (off_t) BLCKSZ * (tag.blockNum % ((BlockNumber) RELSEG_SIZE));
+ int rc;
+#if DEBUG_PREFETCH
+ instr_time start, stop;
+ INSTR_TIME_SET_CURRENT(start);
+#endif
+ rc = FilePrefetch(file, offs, BLCKSZ, WAIT_EVENT_DATA_FILE_PREFETCH);
+ if (rc != 0)
+ elog(ERROR, "WAL_PREFETCH: failed to prefetch file: %m");
+ else if (++n_prefetched % STAT_REFRESH_PERIOD == 0)
+ {
+ char buf[1024];
+ sprintf(buf, "Prefetch %ld blocks at LSN %llx, replay LSN %llx",
+ n_prefetched, (long long)xlogreader->ReadRecPtr, (long long)replay_lsn);
+ pgstat_report_activity(STATE_RUNNING, buf);
+ elog(DEBUG1, "%s", buf);
+ }
+#if DEBUG_PREFETCH
+ INSTR_TIME_SET_CURRENT(stop);
+ INSTR_TIME_SUBTRACT(stop,start);
+ elog(LOG, "WAL_PREFETCH: %x/%x prefetch block %d fork %d of relation %d at LSN %llx, replay LSN %llx (%u usec), %ld prefetched, %ld cached, %ld fpw, %ld initialized",
+ XLogRecGetRmid(xlogreader), XLogRecGetInfo(xlogreader),
+ tag.blockNum, tag.forkNum, tag.rnode.relNode, (long long)xlogreader->ReadRecPtr, (long long)replay_lsn,
+ (int)INSTR_TIME_GET_MICROSEC(stop), n_prefetched, n_cached, n_fpw, n_initialized);
+#endif
+ }
+ else
+ elog(LOG, "WAL_PREFETCH: file segment doesn't exists");
+ }
+ }
+ else
+ {
+ elog(LOG, "WAL_PREFETCH: wait for valid record at LSN %llx, replay_lsn %llx: %s",
+ (long long)xlogreader->EndRecPtr, (long long)replay_lsn, errormsg);
+ WalWaitWAL();
+ }
+ }
+}
+
+/*
+ * Almost copy of read_local_xlog_page from xlogutils.c, but it reads until flush position of WAL receiver, rather then replay position.
+ */
+static int
+WalReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr,
+ int reqLen, XLogRecPtr targetRecPtr, char *cur_page,
+ TimeLineID *pageTLI)
+{
+ XLogRecPtr read_upto,
+ loc;
+ int count;
+
+ loc = targetPagePtr + reqLen;
+
+ /* Loop waiting for xlog to be available if necessary */
+ while (1)
+ {
+ /*
+ * If we perform recovery at startup then read until end of WAL,
+ * otherwise if there is active WAL receiver at replica, read until the end of received data,
+ * if there is no active wal recevier, then just sleep.
+ */
+ read_upto = WalRcv->walRcvState == WALRCV_STOPPED
+ ? RecoveryInProgress() ? (XLogRecPtr)-1 : InvalidXLogRecPtr
+ : WalRcv->receivedUpto;
+ *pageTLI = replay_timeline;
+
+ if (loc <= read_upto)
+ break;
+
+ elog(LOG_LEVEL, "WAL_PREFETCH: wait for new WAL records at LSN %llx, read up to lsn %llx",
+ (long long)loc, (long long)read_upto);
+ WalWaitWAL();
+ CHECK_FOR_INTERRUPTS();
+ if (shutdown_requested)
+ return -1;
+ }
+
+ if (targetPagePtr + XLOG_BLCKSZ <= read_upto)
+ {
+ /*
+ * more than one block available; read only that block, have caller
+ * come back if they need more.
+ */
+ count = XLOG_BLCKSZ;
+ }
+ else if (targetPagePtr + reqLen > read_upto)
+ {
+ /* not enough data there */
+ return -1;
+ }
+ else
+ {
+ /* enough bytes available to satisfy the request */
+ count = read_upto - targetPagePtr;
+ }
+
+ /*
+ * Even though we just determined how much of the page can be validly read
+ * as 'count', read the whole page anyway. It's guaranteed to be
+ * zero-padded up to the page boundary if it's incomplete.
+ */
+ XLogRead(cur_page, state->wal_segment_size, *pageTLI, targetPagePtr, XLOG_BLCKSZ);
+
+
+ /* number of valid bytes in the buffer */
+ return count;
+}
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index e47ddca..6319ed5 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -252,7 +252,7 @@ static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
-static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
+static void WalSndRead(char *buf, XLogRecPtr startptr, Size count);
/* Initialize walsender process before entering the main command loop */
@@ -771,7 +771,7 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
count = flushptr - targetPagePtr; /* part of the page available */
/* now actually read the data, we know it's there */
- XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ);
+ WalSndRead(cur_page, targetPagePtr, XLOG_BLCKSZ);
return count;
}
@@ -2314,7 +2314,7 @@ WalSndKill(int code, Datum arg)
* more than one.
*/
static void
-XLogRead(char *buf, XLogRecPtr startptr, Size count)
+WalSndRead(char *buf, XLogRecPtr startptr, Size count)
{
char *p;
XLogRecPtr recptr;
@@ -2710,7 +2710,7 @@ XLogSendPhysical(void)
* calls.
*/
enlargeStringInfo(&output_message, nbytes);
- XLogRead(&output_message.data[output_message.len], startptr, nbytes);
+ WalSndRead(&output_message.data[output_message.len], startptr, nbytes);
output_message.len += nbytes;
output_message.data[output_message.len] = '\0';
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 01eabe5..82849dd 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -585,6 +585,19 @@ PrefetchBuffer(Relation reln, ForkNumber forkNum, BlockNumber blockNum)
#endif /* USE_PREFETCH */
}
+bool
+IsBlockCached(BufferTag* tag)
+{
+ uint32 hash = BufTableHashCode(tag);
+ LWLock *plock = BufMappingPartitionLock(hash);
+ int bufid;
+
+ LWLockAcquire(plock, LW_SHARED);
+ bufid = BufTableLookup(tag, hash);
+ LWLockRelease(plock);
+
+ return bufid >= 0;
+}
/*
* ReadBuffer -- a shorthand for ReadBufferExtended, for reading from main
diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c
index 2ec103e..4d337b9 100644
--- a/src/backend/storage/smgr/md.c
+++ b/src/backend/storage/smgr/md.c
@@ -43,6 +43,8 @@
#define FSYNCS_PER_ABSORB 10
#define UNLINKS_PER_ABSORB 10
+/* #define DEBUG_PREFETCH 1 */
+
/*
* Special values for the segno arg to RememberFsyncRequest.
*
@@ -733,7 +735,10 @@ mdread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
off_t seekpos;
int nbytes;
MdfdVec *v;
-
+#if DEBUG_PREFETCH
+ instr_time start, stop;
+ INSTR_TIME_SET_CURRENT(start);
+#endif
TRACE_POSTGRESQL_SMGR_MD_READ_START(forknum, blocknum,
reln->smgr_rnode.node.spcNode,
reln->smgr_rnode.node.dbNode,
@@ -788,6 +793,11 @@ mdread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
blocknum, FilePathName(v->mdfd_vfd),
nbytes, BLCKSZ)));
}
+#if DEBUG_PREFETCH
+ INSTR_TIME_SET_CURRENT(stop);
+ INSTR_TIME_SUBTRACT(stop,start);
+ elog(LOG, "Read block %d fork %d of relation %d (%d usec)", blocknum, forknum, reln->smgr_rnode.node.relNode, (int)INSTR_TIME_GET_MICROSEC(stop));
+#endif
}
/*
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index fa3c8a7..2e627b2 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -61,6 +61,7 @@
#include "postmaster/bgwriter.h"
#include "postmaster/postmaster.h"
#include "postmaster/syslogger.h"
+#include "postmaster/walprefetcher.h"
#include "postmaster/walwriter.h"
#include "replication/logicallauncher.h"
#include "replication/slot.h"
@@ -1823,6 +1824,17 @@ static struct config_bool ConfigureNamesBool[] =
NULL, NULL, NULL
},
+ {
+ {"wal_prefetch_enabled", PGC_USERSET, DEVELOPER_OPTIONS,
+ gettext_noop("Allow prefetch of blocks referenced by WAL records."),
+ NULL,
+ GUC_NOT_IN_SAMPLE
+ },
+ &WalPrefetchEnabled,
+ false,
+ NULL, NULL, NULL
+ },
+
/* End-of-list marker */
{
{NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL, NULL
@@ -2487,6 +2499,39 @@ static struct config_int ConfigureNamesInt[] =
},
{
+ {"wal_prefetch_min_lead", PGC_SIGHUP, WAL_SETTINGS,
+ gettext_noop("Minimal lead (kb) before WAL replay LSN and prefetched LSN."),
+ NULL,
+ GUC_UNIT_KB
+ },
+ &WalPrefetchMinLead,
+ 0, 0, INT_MAX,
+ NULL, NULL, NULL
+ },
+
+ {
+ {"wal_prefetch_max_lead", PGC_SIGHUP, WAL_SETTINGS,
+ gettext_noop("Maximal lead (kb) before WAL replay LSN and prefetched LSN."),
+ NULL,
+ GUC_UNIT_KB
+ },
+ &WalPrefetchMaxLead,
+ 0, 0, INT_MAX,
+ NULL, NULL, NULL
+ },
+
+ {
+ {"wal_prefetch_poll_interval", PGC_SIGHUP, WAL_SETTINGS,
+ gettext_noop("Interval of polling WAL by WAL prefetcher."),
+ NULL,
+ GUC_UNIT_MS
+ },
+ &WalPrefetchPollInterval,
+ 100, 1, 10000,
+ NULL, NULL, NULL
+ },
+
+ {
{"wal_writer_delay", PGC_SIGHUP, WAL_SETTINGS,
gettext_noop("Time between WAL flushes performed in the WAL writer."),
NULL,
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index f307b63..70eac88 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -212,9 +212,7 @@ extern bool XLogReaderValidatePageHeader(XLogReaderState *state,
/* Invalidate read state */
extern void XLogReaderInvalReadState(XLogReaderState *state);
-#ifdef FRONTEND
extern XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr);
-#endif /* FRONTEND */
/* Functions for decoding an XLogRecord */
diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h
index c406699..c2b8a6f 100644
--- a/src/include/access/xlogutils.h
+++ b/src/include/access/xlogutils.h
@@ -55,4 +55,7 @@ extern int read_local_xlog_page(XLogReaderState *state,
extern void XLogReadDetermineTimeline(XLogReaderState *state,
XLogRecPtr wantPage, uint32 wantLength);
+extern void XLogRead(char *buf, int segsize, TimeLineID tli, XLogRecPtr startptr,
+ Size count);
+
#endif
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
index e167ee8..5f8b67d 100644
--- a/src/include/miscadmin.h
+++ b/src/include/miscadmin.h
@@ -400,6 +400,7 @@ typedef enum
CheckpointerProcess,
WalWriterProcess,
WalReceiverProcess,
+ WalPrefetcherProcess,
NUM_AUXPROCTYPES /* Must be last! */
} AuxProcType;
@@ -412,6 +413,7 @@ extern AuxProcType MyAuxProcType;
#define AmCheckpointerProcess() (MyAuxProcType == CheckpointerProcess)
#define AmWalWriterProcess() (MyAuxProcType == WalWriterProcess)
#define AmWalReceiverProcess() (MyAuxProcType == WalReceiverProcess)
+#define AmWalPrefetcherProcess() (MyAuxProcType == WalPrefetcherProcess)
/*****************************************************************************
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index be2f592..20ab699 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -710,7 +710,8 @@ typedef enum BackendType
B_STARTUP,
B_WAL_RECEIVER,
B_WAL_SENDER,
- B_WAL_WRITER
+ B_WAL_WRITER,
+ B_WAL_PREFETCHER
} BackendType;
@@ -767,7 +768,8 @@ typedef enum
WAIT_EVENT_SYSLOGGER_MAIN,
WAIT_EVENT_WAL_RECEIVER_MAIN,
WAIT_EVENT_WAL_SENDER_MAIN,
- WAIT_EVENT_WAL_WRITER_MAIN
+ WAIT_EVENT_WAL_WRITER_MAIN,
+ WAIT_EVENT_WAL_PREFETCHER_MAIN
} WaitEventActivity;
/* ----------
diff --git a/src/include/postmaster/walprefetcher.h b/src/include/postmaster/walprefetcher.h
new file mode 100644
index 0000000..de156a9
--- /dev/null
+++ b/src/include/postmaster/walprefetcher.h
@@ -0,0 +1,24 @@
+/*-------------------------------------------------------------------------
+ *
+ * walprefetcher.h
+ * Exports from postmaster/walprefetcher.c.
+ *
+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ *
+ * src/include/postmaster/walprefetcher.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef _WALPREFETCHER_H
+#define _WALPREFETCHER_H
+
+/* GUC options */
+extern int WalPrefetchMinLead;
+extern int WalPrefetchMaxLead;
+extern int WalPrefetchPollInterval;
+extern bool WalPrefetchEnabled;
+
+extern void WalPrefetcherMain(void) pg_attribute_noreturn();
+extern void WalPrefetch(XLogRecPtr lsn);
+
+#endif /* _WALPREFETCHER_H */
diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h
index 5370035..aa1ca73 100644
--- a/src/include/storage/buf_internals.h
+++ b/src/include/storage/buf_internals.h
@@ -337,5 +337,6 @@ extern void DropRelFileNodeLocalBuffers(RelFileNode rnode, ForkNumber forkNum,
BlockNumber firstDelBlock);
extern void DropRelFileNodeAllLocalBuffers(RelFileNode rnode);
extern void AtEOXact_LocalBuffers(bool isCommit);
+extern bool IsBlockCached(BufferTag* tag);
#endif /* BUFMGR_INTERNALS_H */