Hi, Here's the "simple patch" that I'm currently experimenting with. It essentially replaces open/close/write/fsync with pmem calls (map/unmap/memcpy/persist variants), and it's by no means committable. But it works well enough for experiments / measurements, etc.
The numbers (5-minute pgbench runs on scale 500) look like this: master/btt master/dax ntt simple ----------------------------------------------------------- 1 5469 7402 7977 6746 16 48222 80869 107025 82343 32 73974 158189 214718 158348 64 85921 154540 225715 164248 96 150602 221159 237008 217253 A chart illustrating these results is attached. The four columns are showing unpatched master with WAL on a pmem device, in BTT or DAX modes, "ntt" is the patch submitted to this thread, and "simple" is the patch I've hacked together. As expected, the BTT case performs poorly (compared to the rest). The "master/dax" and "simple" perform about the same. There are some differences, but those may be attributed to noise. The NTT patch does outperform these cases by ~20-40% in some cases. The question is why. I recall suggestions this is due to page faults when writing data into the WAL, but I did experiment with various settings that I think should prevent that (e.g. disabling WAL reuse and/or disabling zeroing the segments) but that made no measurable difference. So I've added some primitive instrumentation to the code, counting the calls and measuring duration for each of the PMEM operations, and printing the stats regularly into log (after ~1M ops). Typical results from a run with a single client look like this (slightly formatted/wrapped for e-mail): PMEM STATS COUNT total 1000000 map 30 unmap 20 memcpy 510210 persist 489740 TIME total 0 map 931080 unmap 188750 memcpy 4938866752 persist 187846686 LENGTH memcpy 4337647616 persist 329824672 This shows that a majority of the 1M calls is memcpy/persist, the rest is mostly negligible - both in terms of number of calls and duration. The time values are in nanoseconds, BTW. So for example we did 30 map_file calls, taking ~0.9ms in total, and the unmap calls took even less time. So the direct impact of map/unmap calls is rather negligible, I think. The dominant part is clearly the memcpy (~5s) and persist (~2s). It's not much per call, but it's overall it costs much more than the map and unmap calls. Finally, let's look at the LENGTH, which is a sum of the ranges either copied to PMEM (memcpy) or fsynced (persist). Those are in bytes, and the memcpy value is way higher than the persist one. In this particular case, it's something like 4.3MB vs. 300kB, so an order of magnitude. It's entirely possible this is a bug/measurement error in the patch. I'm not all that familiar with the XLOG stuff, so maybe I did some silly mistake somewhere. But I think it might be also explained by the fact that XLogWrite() always writes the WAL in a multiple of 8kB pages. Which is perfectly reasonable for regular block-oriented storage, but pmem/dax is exactly about not having to do that - PMEM is byte-addressable. And with pgbech, the individual WAL records are tiny, so having to instead write/flush the whole 8kB page (or more of them) repeatedly, as we append the WAL records, seems a bit wasteful. So I wonder if this is why the trivial patch does not show any benefits. regards -- Tomas Vondra EnterpriseDB: http://www.enterprisedb.com The Enterprise PostgreSQL Company
diff --git a/configure b/configure index dd64692345..91226ee880 100755 --- a/configure +++ b/configure @@ -867,6 +867,7 @@ with_libxml with_libxslt with_system_tzdata with_zlib +with_nvwal with_gnu_ld enable_largefile ' @@ -1572,6 +1573,7 @@ Optional Packages: use system time zone data in DIR --without-zlib do not use Zlib --with-gnu-ld assume the C compiler uses GNU ld [default=no] + --with-nvwal use non-volatile WAL buffer (on a PMEM device) Some influential environment variables: CC C compiler command @@ -8598,6 +8600,203 @@ else fi +# +# Non-volatile WAL buffer (NVWAL) +# +{ $as_echo "$as_me:${as_lineno-$LINENO}: checking whether to build with non-volatile WAL buffer (NVWAL)" >&5 +$as_echo_n "checking whether to build with non-volatile WAL buffer (NVWAL)... " >&6; } + + + +# Check whether --with-nvwal was given. +if test "${with_nvwal+set}" = set; then : + withval=$with_nvwal; + case $withval in + yes) + +$as_echo "#define USE_NVWAL 1" >>confdefs.h + + ;; + no) + : + ;; + *) + as_fn_error $? "no argument expected for --with-nvwal option" "$LINENO" 5 + ;; + esac + +else + with_nvwal=no + +fi + + +{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $with_nvwal" >&5 +$as_echo "$with_nvwal" >&6; } + +# +# Elf +# + +# Assume system is ELF if it predefines __ELF__ as 1, +# otherwise believe host_os based default. +case $host_os in + freebsd1*|freebsd2*) elf=no;; + freebsd3*|freebsd4*) elf=yes;; +esac + + +{ $as_echo "$as_me:${as_lineno-$LINENO}: checking for grep that handles long lines and -e" >&5 +$as_echo_n "checking for grep that handles long lines and -e... " >&6; } +if ${ac_cv_path_GREP+:} false; then : + $as_echo_n "(cached) " >&6 +else + if test -z "$GREP"; then + ac_path_GREP_found=false + # Loop through the user's path and test for each of PROGNAME-LIST + as_save_IFS=$IFS; IFS=$PATH_SEPARATOR +for as_dir in $PATH$PATH_SEPARATOR/usr/xpg4/bin +do + IFS=$as_save_IFS + test -z "$as_dir" && as_dir=. + for ac_prog in grep ggrep; do + for ac_exec_ext in '' $ac_executable_extensions; do + ac_path_GREP="$as_dir/$ac_prog$ac_exec_ext" + as_fn_executable_p "$ac_path_GREP" || continue +# Check for GNU ac_path_GREP and select it if it is found. + # Check for GNU $ac_path_GREP +case `"$ac_path_GREP" --version 2>&1` in +*GNU*) + ac_cv_path_GREP="$ac_path_GREP" ac_path_GREP_found=:;; +*) + ac_count=0 + $as_echo_n 0123456789 >"conftest.in" + while : + do + cat "conftest.in" "conftest.in" >"conftest.tmp" + mv "conftest.tmp" "conftest.in" + cp "conftest.in" "conftest.nl" + $as_echo 'GREP' >> "conftest.nl" + "$ac_path_GREP" -e 'GREP$' -e '-(cannot match)-' < "conftest.nl" >"conftest.out" 2>/dev/null || break + diff "conftest.out" "conftest.nl" >/dev/null 2>&1 || break + as_fn_arith $ac_count + 1 && ac_count=$as_val + if test $ac_count -gt ${ac_path_GREP_max-0}; then + # Best one so far, save it but keep looking for a better one + ac_cv_path_GREP="$ac_path_GREP" + ac_path_GREP_max=$ac_count + fi + # 10*(2^10) chars as input seems more than enough + test $ac_count -gt 10 && break + done + rm -f conftest.in conftest.tmp conftest.nl conftest.out;; +esac + + $ac_path_GREP_found && break 3 + done + done + done +IFS=$as_save_IFS + if test -z "$ac_cv_path_GREP"; then + as_fn_error $? "no acceptable grep could be found in $PATH$PATH_SEPARATOR/usr/xpg4/bin" "$LINENO" 5 + fi +else + ac_cv_path_GREP=$GREP +fi + +fi +{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $ac_cv_path_GREP" >&5 +$as_echo "$ac_cv_path_GREP" >&6; } + GREP="$ac_cv_path_GREP" + + +{ $as_echo "$as_me:${as_lineno-$LINENO}: checking for egrep" >&5 +$as_echo_n "checking for egrep... " >&6; } +if ${ac_cv_path_EGREP+:} false; then : + $as_echo_n "(cached) " >&6 +else + if echo a | $GREP -E '(a|b)' >/dev/null 2>&1 + then ac_cv_path_EGREP="$GREP -E" + else + if test -z "$EGREP"; then + ac_path_EGREP_found=false + # Loop through the user's path and test for each of PROGNAME-LIST + as_save_IFS=$IFS; IFS=$PATH_SEPARATOR +for as_dir in $PATH$PATH_SEPARATOR/usr/xpg4/bin +do + IFS=$as_save_IFS + test -z "$as_dir" && as_dir=. + for ac_prog in egrep; do + for ac_exec_ext in '' $ac_executable_extensions; do + ac_path_EGREP="$as_dir/$ac_prog$ac_exec_ext" + as_fn_executable_p "$ac_path_EGREP" || continue +# Check for GNU ac_path_EGREP and select it if it is found. + # Check for GNU $ac_path_EGREP +case `"$ac_path_EGREP" --version 2>&1` in +*GNU*) + ac_cv_path_EGREP="$ac_path_EGREP" ac_path_EGREP_found=:;; +*) + ac_count=0 + $as_echo_n 0123456789 >"conftest.in" + while : + do + cat "conftest.in" "conftest.in" >"conftest.tmp" + mv "conftest.tmp" "conftest.in" + cp "conftest.in" "conftest.nl" + $as_echo 'EGREP' >> "conftest.nl" + "$ac_path_EGREP" 'EGREP$' < "conftest.nl" >"conftest.out" 2>/dev/null || break + diff "conftest.out" "conftest.nl" >/dev/null 2>&1 || break + as_fn_arith $ac_count + 1 && ac_count=$as_val + if test $ac_count -gt ${ac_path_EGREP_max-0}; then + # Best one so far, save it but keep looking for a better one + ac_cv_path_EGREP="$ac_path_EGREP" + ac_path_EGREP_max=$ac_count + fi + # 10*(2^10) chars as input seems more than enough + test $ac_count -gt 10 && break + done + rm -f conftest.in conftest.tmp conftest.nl conftest.out;; +esac + + $ac_path_EGREP_found && break 3 + done + done + done +IFS=$as_save_IFS + if test -z "$ac_cv_path_EGREP"; then + as_fn_error $? "no acceptable egrep could be found in $PATH$PATH_SEPARATOR/usr/xpg4/bin" "$LINENO" 5 + fi +else + ac_cv_path_EGREP=$EGREP +fi + + fi +fi +{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $ac_cv_path_EGREP" >&5 +$as_echo "$ac_cv_path_EGREP" >&6; } + EGREP="$ac_cv_path_EGREP" + + +cat confdefs.h - <<_ACEOF >conftest.$ac_ext +/* end confdefs.h. */ +#if __ELF__ + yes +#endif + +_ACEOF +if (eval "$ac_cpp conftest.$ac_ext") 2>&5 | + $EGREP "yes" >/dev/null 2>&1; then : + ELF_SYS=true +else + if test "X$elf" = "Xyes" ; then + ELF_SYS=true +else + ELF_SYS= +fi +fi +rm -f conftest* + + + @@ -12962,6 +13161,57 @@ fi fi +# for non-volatile WAL buffer (NVWAL) +if test "$with_nvwal" = yes; then + { $as_echo "$as_me:${as_lineno-$LINENO}: checking for pmem_map_file in -lpmem" >&5 +$as_echo_n "checking for pmem_map_file in -lpmem... " >&6; } +if ${ac_cv_lib_pmem_pmem_map_file+:} false; then : + $as_echo_n "(cached) " >&6 +else + ac_check_lib_save_LIBS=$LIBS +LIBS="-lpmem $LIBS" +cat confdefs.h - <<_ACEOF >conftest.$ac_ext +/* end confdefs.h. */ + +/* Override any GCC internal prototype to avoid an error. + Use char because int might match the return type of a GCC + builtin and then its argument prototype would still apply. */ +#ifdef __cplusplus +extern "C" +#endif +char pmem_map_file (); +int +main () +{ +return pmem_map_file (); + ; + return 0; +} +_ACEOF +if ac_fn_c_try_link "$LINENO"; then : + ac_cv_lib_pmem_pmem_map_file=yes +else + ac_cv_lib_pmem_pmem_map_file=no +fi +rm -f core conftest.err conftest.$ac_objext \ + conftest$ac_exeext conftest.$ac_ext +LIBS=$ac_check_lib_save_LIBS +fi +{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $ac_cv_lib_pmem_pmem_map_file" >&5 +$as_echo "$ac_cv_lib_pmem_pmem_map_file" >&6; } +if test "x$ac_cv_lib_pmem_pmem_map_file" = xyes; then : + cat >>confdefs.h <<_ACEOF +#define HAVE_LIBPMEM 1 +_ACEOF + + LIBS="-lpmem $LIBS" + +else + as_fn_error $? "library 'libpmem' is required for non-volatile WAL buffer (NVWAL)" "$LINENO" 5 +fi + +fi + ## ## Header files @@ -13641,6 +13891,18 @@ fi done + +fi + +# for non-volatile WAL buffer (NVWAL) +if test "$with_nvwal" = yes ; then + ac_fn_c_check_header_mongrel "$LINENO" "libpmem.h" "ac_cv_header_libpmem_h" "$ac_includes_default" +if test "x$ac_cv_header_libpmem_h" = xyes; then : + +else + as_fn_error $? "header file <libpmem.h> is required for non-volatile WAL buffer (NVWAL)" "$LINENO" 5 +fi + fi if test "$PORTNAME" = "win32" ; then diff --git a/configure.ac b/configure.ac index 748fb50236..460a227fe7 100644 --- a/configure.ac +++ b/configure.ac @@ -999,6 +999,38 @@ PGAC_ARG_BOOL(with, zlib, yes, [do not use Zlib]) AC_SUBST(with_zlib) +# +# Non-volatile WAL buffer (NVWAL) +# +AC_MSG_CHECKING([whether to build with non-volatile WAL buffer (NVWAL)]) +PGAC_ARG_BOOL(with, nvwal, no, [use non-volatile WAL buffer (NVWAL)], + [AC_DEFINE([USE_NVWAL], 1, [Define to 1 to use non-volatile WAL buffer (NVWAL). (--with-nvwal)])]) +AC_MSG_RESULT([$with_nvwal]) + +# +# Elf +# + +# Assume system is ELF if it predefines __ELF__ as 1, +# otherwise believe host_os based default. +case $host_os in + freebsd1*|freebsd2*) elf=no;; + freebsd3*|freebsd4*) elf=yes;; +esac + +AC_EGREP_CPP(yes, +[#if __ELF__ + yes +#endif +], +[ELF_SYS=true], +[if test "X$elf" = "Xyes" ; then + ELF_SYS=true +else + ELF_SYS= +fi]) +AC_SUBST(ELF_SYS) + # # Assignments # @@ -1303,6 +1335,12 @@ elif test "$with_uuid" = ossp ; then fi AC_SUBST(UUID_LIBS) +# for non-volatile WAL buffer (NVWAL) +if test "$with_nvwal" = yes; then + AC_CHECK_LIB(pmem, pmem_map_file, [], + [AC_MSG_ERROR([library 'libpmem' is required for non-volatile WAL buffer (NVWAL)])]) +fi + ## ## Header files @@ -1480,6 +1518,11 @@ elif test "$with_uuid" = ossp ; then [AC_MSG_ERROR([header file <ossp/uuid.h> or <uuid.h> is required for OSSP UUID])])]) fi +# for non-volatile WAL buffer (NVWAL) +if test "$with_nvwal" = yes ; then + AC_CHECK_HEADER(libpmem.h, [], [AC_MSG_ERROR([header file <libpmem.h> is required for non-volatile WAL buffer (NVWAL)])]) +fi + if test "$PORTNAME" = "win32" ; then AC_CHECK_HEADERS(crtdefs.h) fi diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 13f1d8c3dc..79a479fcf5 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -21,6 +21,11 @@ #include <sys/stat.h> #include <sys/time.h> #include <unistd.h> +#include <libpmem.h> + +#include <stdint.h> /* for uint64 definition */ +#include <stdlib.h> /* for exit() definition */ +#include <time.h> /* for clock_gettime */ #include "access/clog.h" #include "access/commit_ts.h" @@ -43,6 +48,7 @@ #include "commands/progress.h" #include "commands/tablespace.h" #include "common/controldata_utils.h" +#include "common/file_perm.h" #include "executor/instrument.h" #include "miscadmin.h" #include "pg_trace.h" @@ -795,7 +801,7 @@ static const char *const xlogSourceNames[] = {"any", "archive", "pg_wal", "strea * write the XLOG, and so will normally refer to the active segment. * Note: call Reserve/ReleaseExternalFD to track consumption of this FD. */ -static int openLogFile = -1; +static void *openLogFile = NULL; static XLogSegNo openLogSegNo = 0; /* @@ -970,6 +976,189 @@ static void WALInsertLockAcquireExclusive(void); static void WALInsertLockRelease(void); static void WALInsertLockUpdateInsertingAt(XLogRecPtr insertingAt); + +#define PMEM_DEBUG + +static int64 +time_delta(struct timespec *start, struct timespec *end) +{ + return (int64) (end->tv_sec - start->tv_sec) * 1000000000L + + (end->tv_nsec - start->tv_nsec); +} + +typedef struct { + + int64 n_total; + int64 n_mmap; + int64 n_munmap; + int64 n_memcpy; + int64 n_persist; + + int64 t_total; + int64 t_mmap; + int64 t_munmap; + int64 t_memcpy; + int64 t_persist; + + int64 l_memcpy; + int64 l_persist; + +} pmem_stats; + +static pmem_stats stats; +static bool stats_initialized = false; + +static inline void +init_stats(void) +{ + if (!stats_initialized) + memset(&stats, 0, sizeof(pmem_stats)); + stats_initialized = true; +} + +static inline void +print_stats(void) +{ + if (stats.n_total >= 1000000) + { + elog(LOG, "PMEM STATS COUNT total %ld map %ld unmap %ld memcpy %ld persist %ld TIME total %ld map %ld unmap %ld memcpy %ld persist %ld LENGTH memcpy %ld persist %ld", + stats.n_total, stats.n_mmap, stats.n_munmap, stats.n_memcpy, stats.n_persist, + stats.t_total, stats.t_mmap, stats.t_munmap, stats.t_memcpy, stats.t_persist, + stats.l_memcpy, stats.l_persist); + + memset(&stats, 0, sizeof(pmem_stats)); + } +} + +static void * +pg_pmem_memcpy_nodrain(void *dst, void *src, size_t len) +{ + void *ret; + +#ifdef PMEM_DEBUG + struct timespec start, end; + int64 delta; + + init_stats(); + + clock_gettime(CLOCK_MONOTONIC, &start); +#endif + ret = pmem_memcpy_nodrain(dst, src, len); + +#ifdef PMEM_DEBUG + + clock_gettime(CLOCK_MONOTONIC, &end); + + delta = time_delta(&start, &end); + + stats.n_total += 1; + stats.n_memcpy += 1; + stats.t_memcpy += delta; + stats.l_memcpy += len; + + print_stats(); +#endif + return ret; +} + +static void * +pg_pmem_map_file(char *path, size_t len, int flags, mode_t mode, size_t *map_len, int *is_pmem) +{ + void *ret; +#ifdef PMEM_DEBUG + + struct timespec start, end; + int64 delta; + + clock_gettime(CLOCK_MONOTONIC, &start); +#endif + ret = pmem_map_file(path, len, flags, mode, map_len, is_pmem); +#ifdef PMEM_DEBUG + + clock_gettime(CLOCK_MONOTONIC, &end); + + delta = time_delta(&start, &end); + + stats.n_total += 1; + stats.n_mmap += 1; + stats.t_mmap += delta; + + print_stats(); + +#endif + return ret; +} + +static int +pg_pmem_unmap(void *addr, size_t len) +{ + int ret; +#ifdef PMEM_DEBUG + + struct timespec start, end; + int64 delta; + + clock_gettime(CLOCK_MONOTONIC, &start); + +#endif + ret = pmem_unmap(addr, len); +#ifdef PMEM_DEBUG + + clock_gettime(CLOCK_MONOTONIC, &end); + + delta = time_delta(&start, &end); + + stats.n_total += 1; + stats.n_munmap += 1; + stats.t_munmap += delta; + + print_stats(); + +#endif + return ret; +} + +static void +pg_pmem_persist(const char *msg, void *addr, size_t from, size_t to) +{ + size_t len = (to - from); + +#ifdef PMEM_DEBUG + + struct timespec start, end; + int64 delta; + + if ((from < 0) || (from > wal_segment_size)) + elog(WARNING, "bogus from value %ld", from); + + if ((to < 0) || (to > wal_segment_size) || (to < from)) + elog(WARNING, "bogus to size %ld", to); + + if ((len <= 0) || (len > wal_segment_size)) + elog(WARNING, "bogus persist len %ld", len); + + clock_gettime(CLOCK_MONOTONIC, &start); + +#endif + + pmem_persist((char *) addr + from, len); + +#ifdef PMEM_DEBUG + + clock_gettime(CLOCK_MONOTONIC, &end); + + delta = time_delta(&start, &end); + + stats.n_total += 1; + stats.n_persist += 1; + stats.t_persist += delta; + stats.l_persist += len; + + print_stats(); + +#endif +} + /* * Insert an XLOG record represented by an already-constructed chain of data * chunks. This is a low-level routine; to construct the WAL record header @@ -2478,7 +2667,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible) * pages here (since we dump what we have at segment end). */ Assert(npages == 0); - if (openLogFile >= 0) + if (openLogFile != NULL) XLogFileClose(); XLByteToPrevSeg(LogwrtResult.Write, openLogSegNo, wal_segment_size); @@ -2490,7 +2679,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible) } /* Make sure we have the current logfile open */ - if (openLogFile < 0) + if (openLogFile == NULL) { XLByteToPrevSeg(LogwrtResult.Write, openLogSegNo, wal_segment_size); @@ -2536,7 +2725,11 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible) { errno = 0; pgstat_report_wait_start(WAIT_EVENT_WAL_WRITE); - written = pg_pwrite(openLogFile, from, nleft, startoffset); + + // written = pg_pwrite(openLogFile, from, nleft, startoffset); + pg_pmem_memcpy_nodrain((char *) openLogFile + startoffset, from, nleft); + written = nleft; + pgstat_report_wait_end(); if (written <= 0) { @@ -2637,11 +2830,11 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible) if (sync_method != SYNC_METHOD_OPEN && sync_method != SYNC_METHOD_OPEN_DSYNC) { - if (openLogFile >= 0 && + if (openLogFile != NULL && !XLByteInPrevSeg(LogwrtResult.Write, openLogSegNo, wal_segment_size)) XLogFileClose(); - if (openLogFile < 0) + if (openLogFile == NULL) { XLByteToPrevSeg(LogwrtResult.Write, openLogSegNo, wal_segment_size); @@ -3070,7 +3263,7 @@ XLogBackgroundFlush(void) */ if (WriteRqst.Write <= LogwrtResult.Flush) { - if (openLogFile >= 0) + if (openLogFile != NULL) { if (!XLByteInPrevSeg(LogwrtResult.Write, openLogSegNo, wal_segment_size)) @@ -3250,7 +3443,7 @@ XLogNeedsFlush(XLogRecPtr record) * take down the system on failure). They will promote to PANIC if we are * in a critical section. */ -int +void * XLogFileInit(XLogSegNo logsegno, bool *use_existent, bool use_lock) { char path[MAXPGPATH]; @@ -3258,10 +3451,13 @@ XLogFileInit(XLogSegNo logsegno, bool *use_existent, bool use_lock) PGAlignedXLogBlock zbuffer; XLogSegNo installed_segno; XLogSegNo max_segno; - int fd; int nbytes; int save_errno; + void *addr; + size_t map_len = 0; + int is_pmem = 0; + XLogFilePath(path, ThisTimeLineID, logsegno, wal_segment_size); /* @@ -3269,8 +3465,15 @@ XLogFileInit(XLogSegNo logsegno, bool *use_existent, bool use_lock) */ if (*use_existent) { - fd = BasicOpenFile(path, O_RDWR | PG_BINARY | get_sync_bit(sync_method)); - if (fd < 0) + /* + * Map an existing file. The second argument (len) should be zero, + * the third argument (flags) should have neither PMEM_FILE_CREATE nor + * PMEM_FILE_EXCL, and the fourth argument (mode) will be ignored. + * + * FIXME maybe check the length and is_pmem flags here? + */ + addr = pg_pmem_map_file(path, 0, 0, 0, &map_len, &is_pmem); + if (!addr) { if (errno != ENOENT) ereport(ERROR, @@ -3278,7 +3481,7 @@ XLogFileInit(XLogSegNo logsegno, bool *use_existent, bool use_lock) errmsg("could not open file \"%s\": %m", path))); } else - return fd; + return addr; } /* @@ -3293,13 +3496,17 @@ XLogFileInit(XLogSegNo logsegno, bool *use_existent, bool use_lock) unlink(tmppath); - /* do not use get_sync_bit() here --- want to fsync only at end of fill */ - fd = BasicOpenFile(tmppath, O_RDWR | O_CREAT | O_EXCL | PG_BINARY); - if (fd < 0) + addr = pg_pmem_map_file(tmppath, wal_segment_size, + PMEM_FILE_CREATE | PMEM_FILE_EXCL, + pg_file_create_mode, &map_len, &is_pmem); + + if (!addr) ereport(ERROR, (errcode_for_file_access(), errmsg("could not create file \"%s\": %m", tmppath))); + /* FIXME check size too */ + memset(zbuffer.data, 0, XLOG_BLCKSZ); pgstat_report_wait_start(WAIT_EVENT_WAL_INIT_WRITE); @@ -3318,7 +3525,7 @@ XLogFileInit(XLogSegNo logsegno, bool *use_existent, bool use_lock) for (nbytes = 0; nbytes < wal_segment_size; nbytes += XLOG_BLCKSZ) { errno = 0; - if (write(fd, zbuffer.data, XLOG_BLCKSZ) != XLOG_BLCKSZ) + if (pg_pmem_memcpy_nodrain((char *) addr + nbytes, zbuffer.data, XLOG_BLCKSZ) != ((char *) addr + nbytes)) { /* if write didn't set errno, assume no disk space */ save_errno = errno ? errno : ENOSPC; @@ -3333,7 +3540,7 @@ XLogFileInit(XLogSegNo logsegno, bool *use_existent, bool use_lock) * enough. */ errno = 0; - if (pg_pwrite(fd, zbuffer.data, 1, wal_segment_size - 1) != 1) + if (pg_pmem_memcpy_nodrain((char *) addr + wal_segment_size - 1, zbuffer.data, 1) != ((char *) addr + wal_segment_size - 1)) { /* if write didn't set errno, assume no disk space */ save_errno = errno ? errno : ENOSPC; @@ -3348,7 +3555,7 @@ XLogFileInit(XLogSegNo logsegno, bool *use_existent, bool use_lock) */ unlink(tmppath); - close(fd); + pg_pmem_unmap(addr, wal_segment_size); errno = save_errno; @@ -3358,11 +3565,15 @@ XLogFileInit(XLogSegNo logsegno, bool *use_existent, bool use_lock) } pgstat_report_wait_start(WAIT_EVENT_WAL_INIT_SYNC); - if (pg_fsync(fd) != 0) + + pg_pmem_persist("XLogFileInit", addr, 0, wal_segment_size); + + if (false) { int save_errno = errno; - close(fd); + pg_pmem_unmap(addr, wal_segment_size); + errno = save_errno; ereport(ERROR, (errcode_for_file_access(), @@ -3370,7 +3581,7 @@ XLogFileInit(XLogSegNo logsegno, bool *use_existent, bool use_lock) } pgstat_report_wait_end(); - if (close(fd) != 0) + if (pg_pmem_unmap(addr, wal_segment_size) != 0) ereport(ERROR, (errcode_for_file_access(), errmsg("could not close file \"%s\": %m", tmppath))); @@ -3411,15 +3622,17 @@ XLogFileInit(XLogSegNo logsegno, bool *use_existent, bool use_lock) *use_existent = false; /* Now open original target segment (might not be file I just made) */ - fd = BasicOpenFile(path, O_RDWR | PG_BINARY | get_sync_bit(sync_method)); - if (fd < 0) + addr = pg_pmem_map_file(path, 0, 0, 0, &map_len, &is_pmem); + if (!addr) ereport(ERROR, (errcode_for_file_access(), errmsg("could not open file \"%s\": %m", path))); + /* FIXME size */ + elog(DEBUG2, "done creating and filling new WAL file"); - return fd; + return addr; } /* @@ -3642,21 +3855,28 @@ InstallXLogFileSegment(XLogSegNo *segno, char *tmppath, /* * Open a pre-existing logfile segment for writing. */ -int +void * XLogFileOpen(XLogSegNo segno) { char path[MAXPGPATH]; - int fd; + void *addr; + size_t map_len = 0; + int is_pmem = 0; XLogFilePath(path, ThisTimeLineID, segno, wal_segment_size); - fd = BasicOpenFile(path, O_RDWR | PG_BINARY | get_sync_bit(sync_method)); - if (fd < 0) + addr = pg_pmem_map_file(path, 0, 0, 0, &map_len, &is_pmem); + if (!addr) ereport(PANIC, (errcode_for_file_access(), errmsg("could not open file \"%s\": %m", path))); - return fd; + if (map_len != wal_segment_size) + elog(PANIC, "size of non-volatile WAL buffer '%s' is invalid; " + "expected %zu; actual %zu", + path, (size_t) wal_segment_size, map_len); + + return addr; } /* @@ -3852,7 +4072,7 @@ XLogFileReadAnyTLI(XLogSegNo segno, int emode, XLogSource source) static void XLogFileClose(void) { - Assert(openLogFile >= 0); + Assert(openLogFile != NULL); /* * WAL segment files will not be re-read in normal operation, so we advise @@ -3861,11 +4081,11 @@ XLogFileClose(void) * use the cache to read the WAL segment. */ #if defined(USE_POSIX_FADVISE) && defined(POSIX_FADV_DONTNEED) - if (!XLogIsNeeded()) - (void) posix_fadvise(openLogFile, 0, 0, POSIX_FADV_DONTNEED); + // if (!XLogIsNeeded()) + // (void) posix_fadvise(openLogFile, 0, 0, POSIX_FADV_DONTNEED); #endif - if (close(openLogFile) != 0) + if (pg_pmem_unmap(openLogFile, wal_segment_size) < 0) { char xlogfname[MAXFNAMELEN]; int save_errno = errno; @@ -3877,7 +4097,7 @@ XLogFileClose(void) errmsg("could not close file \"%s\": %m", xlogfname))); } - openLogFile = -1; + openLogFile = NULL; ReleaseExternalFD(); } @@ -3895,7 +4115,7 @@ static void PreallocXlogFiles(XLogRecPtr endptr) { XLogSegNo _logSegNo; - int lf; + void *lf; bool use_existent; uint64 offset; @@ -3906,7 +4126,7 @@ PreallocXlogFiles(XLogRecPtr endptr) _logSegNo++; use_existent = true; lf = XLogFileInit(_logSegNo, &use_existent, true); - close(lf); + pg_pmem_unmap(lf, wal_segment_size); if (!use_existent) CheckpointStats.ckpt_segs_added++; } @@ -5308,7 +5528,7 @@ BootStrapXLOG(void) /* Write the first page with the initial record */ errno = 0; pgstat_report_wait_start(WAIT_EVENT_WAL_BOOTSTRAP_WRITE); - if (write(openLogFile, page, XLOG_BLCKSZ) != XLOG_BLCKSZ) + if (pg_pmem_memcpy_nodrain(openLogFile, page, XLOG_BLCKSZ) != openLogFile) { /* if write didn't set errno, assume problem is no disk space */ if (errno == 0) @@ -5320,18 +5540,20 @@ BootStrapXLOG(void) pgstat_report_wait_end(); pgstat_report_wait_start(WAIT_EVENT_WAL_BOOTSTRAP_SYNC); - if (pg_fsync(openLogFile) != 0) + pg_pmem_persist("BootStrapXLOG", openLogFile, 0, wal_segment_size); + + if (false) ereport(PANIC, (errcode_for_file_access(), errmsg("could not fsync bootstrap write-ahead log file: %m"))); pgstat_report_wait_end(); - if (close(openLogFile) != 0) + if (pg_pmem_unmap(openLogFile, wal_segment_size) != 0) ereport(PANIC, (errcode_for_file_access(), errmsg("could not close bootstrap write-ahead log file: %m"))); - openLogFile = -1; + openLogFile = NULL; /* Now create pg_control */ InitControlFile(sysidentifier); @@ -5605,11 +5827,11 @@ exitArchiveRecovery(TimeLineID endTLI, XLogRecPtr endOfLog) * segment on the new timeline. */ bool use_existent = true; - int fd; + void *addr; - fd = XLogFileInit(startLogSegNo, &use_existent, true); + addr = XLogFileInit(startLogSegNo, &use_existent, true); - if (close(fd) != 0) + if (pg_pmem_unmap(addr, wal_segment_size) != 0) { char xlogfname[MAXFNAMELEN]; int save_errno = errno; @@ -10373,10 +10595,12 @@ assign_xlog_sync_method(int new_sync_method, void *extra) * changing, close the log file so it will be reopened (with new flag * bit) at next use. */ - if (openLogFile >= 0) + if (openLogFile != NULL) { pgstat_report_wait_start(WAIT_EVENT_WAL_SYNC_METHOD_ASSIGN); - if (pg_fsync(openLogFile) != 0) + pg_pmem_persist("assign_xlog_sync_method", openLogFile, 0, wal_segment_size); + + if (false) { char xlogfname[MAXFNAMELEN]; int save_errno; @@ -10405,37 +10629,21 @@ assign_xlog_sync_method(int new_sync_method, void *extra) * 'segno' is for error reporting purposes. */ void -issue_xlog_fsync(int fd, XLogSegNo segno) +issue_xlog_fsync(void *addr, XLogSegNo segno) { char *msg = NULL; + /* XXX not sure if correct? */ + size_t from = (LogwrtResult.Flush % wal_segment_size); + size_t to = (LogwrtResult.Write % wal_segment_size); + + /* flush until the end of the segment */ + if (to == 0) + to = wal_segment_size; + pgstat_report_wait_start(WAIT_EVENT_WAL_SYNC); - switch (sync_method) - { - case SYNC_METHOD_FSYNC: - if (pg_fsync_no_writethrough(fd) != 0) - msg = _("could not fsync file \"%s\": %m"); - break; -#ifdef HAVE_FSYNC_WRITETHROUGH - case SYNC_METHOD_FSYNC_WRITETHROUGH: - if (pg_fsync_writethrough(fd) != 0) - msg = _("could not fsync write-through file \"%s\": %m"); - break; -#endif -#ifdef HAVE_FDATASYNC - case SYNC_METHOD_FDATASYNC: - if (pg_fdatasync(fd) != 0) - msg = _("could not fdatasync file \"%s\": %m"); - break; -#endif - case SYNC_METHOD_OPEN: - case SYNC_METHOD_OPEN_DSYNC: - /* write synced it already */ - break; - default: - elog(PANIC, "unrecognized wal_sync_method: %d", sync_method); - break; - } + + pg_pmem_persist("issue_xlog_fsync", addr, from, to); /* PANIC if failed to fsync */ if (msg) diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 87c3ea450e..2836d82132 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -50,6 +50,7 @@ #include "postgres.h" #include <unistd.h> +#include <libpmem.h> #include "access/htup_details.h" #include "access/timeline.h" @@ -100,7 +101,7 @@ WalReceiverFunctionsType *WalReceiverFunctions = NULL; * but for walreceiver to write the XLOG. recvFileTLI is the TimeLineID * corresponding the filename of recvFile. */ -static int recvFile = -1; +static void *recvFile = NULL; static TimeLineID recvFileTLI = 0; static XLogSegNo recvSegNo = 0; @@ -602,7 +603,7 @@ WalReceiverMain(void) XLogWalRcvFlush(false); XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size); - if (close(recvFile) != 0) + if (pmem_unmap(recvFile, wal_segment_size) != 0) ereport(PANIC, (errcode_for_file_access(), errmsg("could not close log segment %s: %m", @@ -617,7 +618,7 @@ WalReceiverMain(void) else XLogArchiveNotify(xlogfname); } - recvFile = -1; + recvFile = NULL; elog(DEBUG1, "walreceiver ended streaming and awaits new instructions"); WalRcvWaitForStartPosition(&startpoint, &startpointTLI); @@ -896,7 +897,7 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) * process soon, so we don't advise the OS to release cache * pages associated with the file like XLogFileClose() does. */ - if (close(recvFile) != 0) + if (pmem_unmap(recvFile, wal_segment_size) != 0) ereport(PANIC, (errcode_for_file_access(), errmsg("could not close log segment %s: %m", @@ -911,7 +912,7 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) else XLogArchiveNotify(xlogfname); } - recvFile = -1; + recvFile = NULL; /* Create/use new log file */ XLByteToSeg(recptr, recvSegNo, wal_segment_size); @@ -931,7 +932,10 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) /* OK to write the logs */ errno = 0; - byteswritten = pg_pwrite(recvFile, buf, segbytes, (off_t) startoff); + // byteswritten = pg_pwrite(recvFile, buf, segbytes, (off_t) startoff); + pmem_memcpy_nodrain((char *) recvFile + startoff, buf, segbytes); + byteswritten = segbytes; + if (byteswritten <= 0) { char xlogfname[MAXFNAMELEN]; diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 221af87e71..d63e5522b4 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -287,8 +287,8 @@ extern XLogRecPtr XLogInsertRecord(struct XLogRecData *rdata, extern void XLogFlush(XLogRecPtr RecPtr); extern bool XLogBackgroundFlush(void); extern bool XLogNeedsFlush(XLogRecPtr RecPtr); -extern int XLogFileInit(XLogSegNo segno, bool *use_existent, bool use_lock); -extern int XLogFileOpen(XLogSegNo segno); +extern void *XLogFileInit(XLogSegNo segno, bool *use_existent, bool use_lock); +extern void *XLogFileOpen(XLogSegNo segno); extern void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli); extern XLogSegNo XLogGetLastRemovedSegno(void); @@ -299,7 +299,7 @@ extern void xlog_redo(XLogReaderState *record); extern void xlog_desc(StringInfo buf, XLogReaderState *record); extern const char *xlog_identify(uint8 info); -extern void issue_xlog_fsync(int fd, XLogSegNo segno); +extern void issue_xlog_fsync(void *addr, XLogSegNo segno); extern bool RecoveryInProgress(void); extern RecoveryState GetRecoveryState(void);