On 09.04.2019 18:48, Robert Haas wrote:
1. There should be a way to tell pg_basebackup to request from the
server only those blocks where LSN >= threshold_value.
Some times ago I have implemented alternative version of ptrack utility
(not one used in pg_probackup)
which detects updated block at file level. It is very simple and may be
it can be sometimes integrated in master.
I attached patch to vanilla to this mail.
Right now it contains just two GUCs:
ptrack_map_size: Size of ptrack map (number of elements) used for
incremental backup: 0 disabled.
ptrack_block_log: Logarithm of ptrack block size (amount of pages)
and one function:
pg_ptrack_get_changeset(startlsn pg_lsn) returns
{relid,relfilenode,reltablespace,forknum,blocknum,segsize,updlsn,path}
Idea is very simple: it creates hash map of fixed size (ptrack_map_size)
and stores LSN of written pages in this map.
As far as postgres default page size seems to be too small for ptrack
block (requiring too large hash map or increasing number of conflicts,
as well as
increasing number of random reads) it is possible to configure ptrack
block to consists of multiple pages (power of 2).
This patch is using memory mapping mechanism. Unfortunately there is no
portable wrapper for it in Postgres, so I have to provide own
implementations for Unix/Windows. Certainly it is not good and should be
rewritten.
How to use?
1. Define ptrack_map_size in postgres.conf, for example (use simple
number for more uniform hashing):
ptrack_map_size = 1000003
2. Remember current lsn.
psql postgres -c "select pg_current_wal_lsn()"
pg_current_wal_lsn
--------------------
0/224A268
(1 row)
3. Do some updates.
$ pgbench -T 10 postgres
4. Select changed blocks.
select * from pg_ptrack_get_changeset('0/224A268');
relid | relfilenode | reltablespace | forknum | blocknum | segsize |
updlsn | path
-------+-------------+---------------+---------+----------+---------+-----------+----------------------
16390 | 16396 | 1663 | 0 | 1640 | 1 |
0/224FD88 | base/12710/16396
16390 | 16396 | 1663 | 0 | 1641 | 1 |
0/2258680 | base/12710/16396
16390 | 16396 | 1663 | 0 | 1642 | 1 |
0/22615A0 | base/12710/16396
...
Certainly ptrack should be used as part of some backup tool (as
pg_basebackup or pg_probackup).
--
Konstantin Knizhnik
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c
index 61a8f11..f4b8506 100644
--- a/src/backend/storage/smgr/md.c
+++ b/src/backend/storage/smgr/md.c
@@ -25,9 +25,20 @@
#include <fcntl.h>
#include <sys/file.h>
+#include <sys/stat.h>
+#ifndef WIN32
+#include "sys/mman.h"
+#endif
+
#include "miscadmin.h"
+#include "funcapi.h"
+#include "access/hash.h"
+#include "access/table.h"
#include "access/xlogutils.h"
#include "access/xlog.h"
+#include "access/htup_details.h"
+#include "catalog/pg_class.h"
+#include "catalog/pg_tablespace.h"
#include "pgstat.h"
#include "postmaster/bgwriter.h"
#include "storage/fd.h"
@@ -36,6 +47,7 @@
#include "storage/relfilenode.h"
#include "storage/smgr.h"
#include "storage/sync.h"
+#include "utils/builtins.h"
#include "utils/hsearch.h"
#include "utils/memutils.h"
#include "pg_trace.h"
@@ -116,6 +128,18 @@ static MemoryContext MdCxt; /* context for all MdfdVec objects */
*/
#define EXTENSION_DONT_CHECK_SIZE (1 << 4)
+/*
+ * Size of ptrack map (number of entries)
+ */
+int ptrack_map_size;
+
+/*
+ * Logarithm of ptrack block size (amount of pages)
+ */
+int ptrack_block_log;
+
+static int ptrack_fd;
+static pg_atomic_uint64* ptrack_map;
/* local routines */
static void mdunlinkfork(RelFileNodeBackend rnode, ForkNumber forkNum,
@@ -138,7 +162,7 @@ static MdfdVec *_mdfd_getseg(SMgrRelation reln, ForkNumber forkno,
BlockNumber blkno, bool skipFsync, int behavior);
static BlockNumber _mdnblocks(SMgrRelation reln, ForkNumber forknum,
MdfdVec *seg);
-
+static void ptrack_mark_block(SMgrRelation reln, ForkNumber forkno, BlockNumber blkno);
/*
* mdinit() -- Initialize private state for magnetic disk storage manager.
@@ -422,6 +446,8 @@ mdextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
register_dirty_segment(reln, forknum, v);
Assert(_mdnblocks(reln, forknum, v) <= ((BlockNumber) RELSEG_SIZE));
+
+ ptrack_mark_block(reln, forknum, blocknum);
}
/*
@@ -575,6 +601,8 @@ mdwriteback(SMgrRelation reln, ForkNumber forknum,
nblocks -= nflush;
blocknum += nflush;
}
+
+ ptrack_sync();
}
/*
@@ -700,6 +728,8 @@ mdwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
if (!skipFsync && !SmgrIsTemp(reln))
register_dirty_segment(reln, forknum, v);
+
+ ptrack_mark_block(reln, forknum, blocknum);
}
/*
@@ -886,6 +916,7 @@ mdimmedsync(SMgrRelation reln, ForkNumber forknum)
FilePathName(v->mdfd_vfd))));
segno--;
}
+ ptrack_sync();
}
/*
@@ -918,6 +949,7 @@ register_dirty_segment(SMgrRelation reln, ForkNumber forknum, MdfdVec *seg)
errmsg("could not fsync file \"%s\": %m",
FilePathName(seg->mdfd_vfd))));
}
+ ptrack_sync();
}
/*
@@ -1315,3 +1347,294 @@ mdfiletagmatches(const FileTag *ftag, const FileTag *candidate)
*/
return ftag->rnode.dbNode == candidate->rnode.dbNode;
}
+
+/*
+ * ---------------------------------------------------------
+ * PTrack functions
+ */
+
+#define PTRACK_MAP_PATH "global/ptrack.map"
+
+/*
+ * Structure identifying block on the disk
+ */
+typedef struct PtBlockId
+{
+ RelFileNode relnode;
+ ForkNumber forknum;
+ BlockNumber blocknum;
+} PtBlockId;
+
+#if PG_VERSION_NUM >= 110000
+#define BID_HASH_FUNC(bid) (size_t)(DatumGetUInt64(hash_any_extended((unsigned char *)&bid, sizeof(bid), 0)) % ptrack_map_size)
+#else
+#define BID_HASH_FUNC(bid) (DatumGetUInt32(hash_any((unsigned char *)&bid, sizeof(bid))) % ptrack_map_size)
+#endif
+
+
+/*
+ * Map ptrack file
+ */
+static void open_ptrack_file(void)
+{
+ /* Align map size on page boundary */
+ size_t size = (ptrack_map_size*sizeof(pg_atomic_uint64) + BLCKSZ - 1) & ~(BLCKSZ-1);
+ off_t file_size;
+
+#if PG_VERSION_NUM >= 110000
+ ptrack_fd = BasicOpenFile(PTRACK_MAP_PATH, O_RDWR | O_CREAT | PG_BINARY);
+#else
+ ptrack_fd = BasicOpenFile(PTRACK_MAP_PATH, O_RDWR | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
+#endif
+ if (ptrack_fd < 0)
+ elog(ERROR, "Failed to open ptrack map file: %m");
+
+
+ file_size = lseek(ptrack_fd, 0, SEEK_END);
+ if (file_size != 0 && file_size != size)
+ elog(FATAL, "Specified ptrack map size %ld doesn't match with actual file size %ld",
+ (long)size, (long)file_size);
+
+ #ifdef WIN32
+ {
+ HANDLE mh = CreateFileMapping(_get_osfhandle(ptrack_fd), NULL, PAGE_READWRITE,
+ 0, (DWORD)size, NULL);
+ if (mh == NULL)
+ elog(ERROR, "Failed to create file mapping: %m");
+
+ ptrack_map = (pg_atomic_uint64*)MapViewOfFile(mh, FILE_MAP_ALL_ACCESS, 0, 0, 0);
+ if (ptrack_map == NULL)
+ elog(ERROR, "Failed to mmap ptrack file: %m");
+ CloseHandle(mh);
+ }
+#else
+ if (ftruncate(ptrack_fd, size) < 0)
+ elog(ERROR, "Failed to truncate ptrack file: %m");
+ ptrack_map = (pg_atomic_uint64*)mmap(NULL, size, PROT_READ|PROT_WRITE, MAP_SHARED, ptrack_fd, 0);
+ if (ptrack_map == MAP_FAILED)
+ elog(ERROR, "Failed to mmap ptrack file: %m");
+#endif
+}
+
+/*
+ * Mark modified block
+ */
+static void ptrack_mark_block(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum)
+{
+ size_t hash;
+ uint64 new_lsn;
+ uint64 old_lsn;
+ PtBlockId bid;
+
+ if (ptrack_map_size != 0 && reln->smgr_rnode.backend == InvalidBackendId) /* do not track temporary relations */
+ {
+ if (ptrack_map == NULL)
+ open_ptrack_file();
+
+ bid.relnode = reln->smgr_rnode.node;
+ bid.forknum = forknum;
+ bid.blocknum = blocknum >> ptrack_block_log;
+ hash = BID_HASH_FUNC(bid);
+
+ new_lsn = GetXLogInsertRecPtr();
+ old_lsn = pg_atomic_read_u64(&ptrack_map[hash]);
+
+ elog(DEBUG2, "map[%ld]=%ld <- %ld", hash, old_lsn , new_lsn);
+
+ /* Atomically assign new LSN value */
+ while (old_lsn < new_lsn && !pg_atomic_compare_exchange_u64(&ptrack_map[hash], &old_lsn, new_lsn));
+ }
+}
+
+/*
+ * Flush ptrack map on the disk. In case of crash, modified page will be restored from WAL, so we do not need to do synchronous flush here
+ * and call pg_fsync.
+ */
+void ptrack_sync(void)
+{
+ if (ptrack_map)
+ {
+ /* Align map size on page boundary */
+ size_t size = (ptrack_map_size*sizeof(pg_atomic_uint64) + BLCKSZ - 1) & ~(BLCKSZ-1);
+#ifdef WIN32
+ if (!FlushViewOfFile(ptrack_map, size))
+#else
+ if (msync(ptrack_map, size, MS_ASYNC) < 0)
+#endif
+ elog(LOG, "Failed to flush ptrack map: %m");
+#if 0
+ if (pg_fsync(ptrack_file) < 0)
+ elog(LOG, "Failed to fsync ptrack file: %m");
+#endif
+ }
+}
+
+/*
+ * Context for ptrack_changeset set returning function
+ */
+typedef struct PtScanCtx
+{
+ SysScanDesc scan;
+ Relation rel;
+ Form_pg_class meta;
+ XLogRecPtr lsn;
+ PtBlockId bid;
+ Oid oid;
+ uint32 relsize;
+ TupleDesc tupdesc;
+} PtScanCtx;
+
+/*
+ * Check if segment file exists
+ */
+static bool segment_exists(PtScanCtx* ctx)
+{
+ char pathname[MAXPGPATH];
+ uint32 segno = (ctx->bid.blocknum << ptrack_block_log) / RELSEG_SIZE;
+ struct stat fst;
+ uint32 cursize;
+ char* relpath = relpathperm(ctx->bid.relnode, ctx->bid.forknum);
+
+ if (segno == 0)
+ snprintf(pathname, MAXPGPATH, "%s",
+ relpath);
+ else
+ snprintf(pathname, MAXPGPATH, "%s.%u",
+ relpath, segno);
+
+ if (stat(pathname, &fst) != 0)
+ return false;
+
+ cursize = segno*RELSEG_SIZE + fst.st_size/BLCKSZ;
+
+ if (fst.st_size < BLCKSZ*RELSEG_SIZE)
+ {
+ /* If this segment is not full, then check if next segments exists.
+ * It can happen that some blocks of relation were not written, so there can
+ * be non-full non-last segments.
+ */
+ snprintf(pathname, MAXPGPATH, "%s.%u",
+ relpath, segno+1);
+ /* If next segment exists, then consider all blocks of this segment */
+ if (stat(pathname, &fst) == 0)
+ cursize = (segno+1)*RELSEG_SIZE;
+ }
+ /* Initial value of relsize is taken from pg_class table */
+ if (cursize > ctx->relsize)
+ ctx->relsize = cursize;
+ return true;
+}
+
+PG_FUNCTION_INFO_V1(pg_ptrack_get_changeset);
+
+/*
+ * Return set of database blocks which were changed since specified LSN.
+ * This function may return false positives (blocks which were not really updated).
+ */
+Datum
+pg_ptrack_get_changeset(PG_FUNCTION_ARGS)
+{
+ FuncCallContext* funcctx;
+ PtScanCtx* ctx;
+ MemoryContext oldcontext;
+ int32 blocknum;
+ XLogRecPtr update_lsn;
+ int n_blocks;
+
+ if (SRF_IS_FIRSTCALL())
+ {
+ funcctx = SRF_FIRSTCALL_INIT();
+
+ if (ptrack_map == NULL)
+ open_ptrack_file();
+
+ oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+ ctx = (PtScanCtx*)palloc0(sizeof(PtScanCtx));
+ ctx->rel = heap_open(RelationRelationId, AccessShareLock);
+ ctx->scan = systable_beginscan(ctx->rel, InvalidOid, false,
+ NULL, 0, NULL);
+ ctx->lsn = PG_GETARG_INT64(0); /* Type of argument is actually LSN */
+ get_call_result_type(fcinfo, NULL, &ctx->tupdesc);
+ funcctx->user_fctx = ctx;
+ MemoryContextSwitchTo(oldcontext);
+ }
+ funcctx = SRF_PERCALL_SETUP();
+ ctx = (PtScanCtx*)funcctx->user_fctx;
+
+ while (true)
+ {
+ if (ctx->meta == NULL)
+ {
+ /* Get next non-temporary relation */
+ do
+ {
+ HeapTuple tuple = systable_getnext(ctx->scan);
+ if (!HeapTupleIsValid(tuple))
+ {
+ /* Done: all database relations traversed */
+ systable_endscan(ctx->scan);
+ heap_close(ctx->rel, AccessShareLock);
+ SRF_RETURN_DONE(funcctx);
+ }
+ ctx->meta = (Form_pg_class) GETSTRUCT(tuple);
+ ctx->oid = ctx->meta->oid;
+ } while (ctx->meta->relpersistence == RELPERSISTENCE_TEMP);
+
+ ctx->bid.relnode.spcNode = ctx->meta->reltablespace ? ctx->meta->reltablespace : DEFAULTTABLESPACE_OID;
+ ctx->bid.relnode.dbNode = ctx->meta->relisshared ? 0 : MyDatabaseId;
+ ctx->bid.relnode.relNode = ctx->meta->relfilenode ? ctx->meta->relfilenode : ctx->oid;
+ ctx->bid.forknum = 0;
+ ctx->bid.blocknum = 0;
+ ctx->relsize = ctx->meta->relpages; /* relpages may be not up-to-date, use it as conservative lower boundary */
+ }
+
+ /* Stop traversal if there are no more segments */
+ blocknum = ctx->bid.blocknum << ptrack_block_log;
+ if ((blocknum % RELSEG_SIZE == 0 && !segment_exists(ctx)) || blocknum > ctx->relsize)
+ {
+ /* No more segments in this relation fork */
+ if (++ctx->bid.forknum > MAX_FORKNUM)
+ ctx->meta = NULL;
+ else
+ ctx->bid.blocknum = 0;
+ continue;
+ }
+ update_lsn = pg_atomic_read_u64(&ptrack_map[BID_HASH_FUNC(ctx->bid)]);
+ n_blocks = 0;
+ do {
+ ctx->bid.blocknum += 1;
+ n_blocks += 1 << ptrack_block_log;
+ } while (blocknum + n_blocks < ctx->relsize
+ && pg_atomic_read_u64(&ptrack_map[BID_HASH_FUNC(ctx->bid)]) == update_lsn);
+
+ if (update_lsn >= ctx->lsn) /* block was changed since specified LSN */
+ {
+ Datum values[8];
+ bool nulls[8] = {false};
+ char pathname[MAXPGPATH];
+ int segno = blocknum / RELSEG_SIZE;
+ char* relpath;
+
+ relpath = relpathperm(ctx->bid.relnode, ctx->bid.forknum);
+
+ if (segno == 0)
+ snprintf(pathname, MAXPGPATH, "%s",
+ relpath);
+ else
+ snprintf(pathname, MAXPGPATH, "%s.%u",
+ relpath, segno);
+
+ values[0] = ObjectIdGetDatum(ctx->oid);
+ values[1] = ObjectIdGetDatum(ctx->bid.relnode.relNode);
+ values[2] = ObjectIdGetDatum(ctx->bid.relnode.spcNode);
+ values[3] = Int32GetDatum(ctx->bid.forknum);
+ values[4] = Int32GetDatum(blocknum);
+ values[5] = Int32GetDatum(n_blocks);
+ values[6] = Int64GetDatum(update_lsn);
+ values[7] = CStringGetTextDatum(pathname);
+
+ SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(heap_form_tuple(ctx->tupdesc, values, nulls)));
+ }
+ }
+}
+
diff --git a/src/backend/storage/sync/sync.c b/src/backend/storage/sync/sync.c
index f77519d..3a7ceee 100644
--- a/src/backend/storage/sync/sync.c
+++ b/src/backend/storage/sync/sync.c
@@ -418,6 +418,9 @@ ProcessSyncRequests(void)
CheckpointStats.ckpt_longest_sync = longest;
CheckpointStats.ckpt_agg_sync_time = total_elapsed;
+ /* Flush ptrack files */
+ ptrack_sync();
+
/* Flag successful completion of ProcessSyncRequests */
sync_in_progress = false;
}
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index f7f726b..545700b 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -1963,6 +1963,24 @@ static struct config_bool ConfigureNamesBool[] =
static struct config_int ConfigureNamesInt[] =
{
{
+ {"ptrack_map_size", PGC_POSTMASTER, RESOURCES_DISK,
+ gettext_noop("Size of ptrack map (number of elements) used for incremental backup: 0 disabled."),
+ NULL
+ },
+ &ptrack_map_size,
+ 0, 0, INT_MAX / 2,
+ NULL, NULL, NULL
+ },
+ {
+ {"ptrack_block_log", PGC_POSTMASTER, RESOURCES_DISK,
+ gettext_noop("Logarithm of ptrack block size (amount of pages)."),
+ NULL
+ },
+ &ptrack_block_log,
+ 0, 0, INT_MAX / 2,
+ NULL, NULL, NULL
+ },
+ {
{"archive_timeout", PGC_SIGHUP, WAL_ARCHIVING,
gettext_noop("Forces a switch to the next WAL file if a "
"new file has not been started within N seconds."),
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index ad4519e..add45a2 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -10672,4 +10672,13 @@
proname => 'pg_partition_root', prorettype => 'regclass',
proargtypes => 'regclass', prosrc => 'pg_partition_root' },
+# Ptract changeset traversal
+{ oid => '5050', descr => 'Return set of database blocks which were changed since specified LSN',
+ proname => 'pg_ptrack_get_changeset', prorows => '1000000', proretset => 't',
+ provolatile => 'v', prorettype => 'record', proargtypes => 'pg_lsn',
+ proallargtypes => '{pg_lsn,oid,oid,oid,int4,int4,int4,pg_lsn,text}',
+ proargmodes => '{i,o,o,o,o,o,o,o,o}',
+ proargnames => '{startlsn,relid,relfilenode,reltablespace,forknum,blocknum,segsize,updlsn,path}',
+ prosrc => 'pg_ptrack_get_changeset' },
+
]
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index e5270d2..a0326d9 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -164,6 +164,9 @@ typedef FormData_pg_proc *Form_pg_proc;
#define PROPARALLEL_RESTRICTED 'r' /* can run in parallel master only */
#define PROPARALLEL_UNSAFE 'u' /* banned while in parallel mode */
+/* Ptract changeset traversal */
+DATA(insert OID = 5050 ( pg_ptrack_get_changeset PGNSP PGUID 12 1 0 0 0 f f f f t t v s 1 0 2249 "3220" "{3220,26,26,26,23,23,23,3220,25}" "{i,o,o,o,o,o,o,o,o}" "{startlsn,relid,relfilenode,reltablespace,forknum,blocknum,segsize,updlsn,path}" _null_ _null_ pg_ptrack_get_changeset _null_ _null_ _null_ ));
+
/*
* Symbolic values for proargmodes column. Note that these must agree with
* the FunctionParameterMode enum in parsenodes.h; we declare them here to
diff --git a/src/include/storage/fd.h b/src/include/storage/fd.h
index a03b4d1..cab7791 100644
--- a/src/include/storage/fd.h
+++ b/src/include/storage/fd.h
@@ -44,6 +44,15 @@
typedef int File;
+/*
+ * Size of ptrack map (number of entries)
+ */
+extern int ptrack_map_size;
+
+/*
+ * Logarithm of ptrack block size (amount of pages)
+ */
+extern int ptrack_block_log;
/* GUC parameter */
extern PGDLLIMPORT int max_files_per_process;
diff --git a/src/include/storage/md.h b/src/include/storage/md.h
index a6758a1..2f94070 100644
--- a/src/include/storage/md.h
+++ b/src/include/storage/md.h
@@ -42,6 +42,7 @@ extern void mdimmedsync(SMgrRelation reln, ForkNumber forknum);
extern void ForgetDatabaseSyncRequests(Oid dbid);
extern void DropRelationFiles(RelFileNode *delrels, int ndelrels, bool isRedo);
+extern void ptrack_sync(void);
/* md sync callbacks */
extern int mdsyncfiletag(const FileTag *ftag, char *path);