On 20/09/2020 23:44, Soumyadeep Chakraborty wrote:
Before getting into the code review for the patch, I wanted to know why
we don't use a Bitmapset for target_modified_pages?
Bitmapset is not available in client code. Perhaps it could be moved to
src/common with some changes, but doesn't seem worth it until there's
more client code that would need it.
I'm not sure that a bitmap is the best data structure for tracking the
changed blocks in the first place. A hash table might be better if there
are only a few changed blocks, or something like
src/backend/lib/integerset.c if there are many. But as long as the
simple bitmap gets the job done, let's keep it simple.
2. Rename target_modified_pages to target_pages_to_overwrite?
target_modified_pages can lead to confusion as to whether it includes pages
that were modified on the target but not even present in the source and
the other exclusionary cases. target_pages_to_overwrite is much clearer.
Agreed, I'll rename it.
Conceptually, while we're scanning source WAL, we're just making note of
the modified blocks. The decision on what to do with them happens only
later, in decide_file_action(). The difference is purely theoretical,
though. There is no real decision to be made, all the modified blocks
will be overwritten. So on the whole, I agree 'target_page_to_overwrite'
is better.
/*
* If this is a relation file, copy the modified blocks.
*
* This is in addition to any other changes.
*/
iter = datapagemap_iterate(&entry->target_modified_pages);
while (datapagemap_next(iter, &blkno))
{
offset = blkno * BLCKSZ;
source->queue_fetch_range(source, entry->path, offset, BLCKSZ);
}
pg_free(iter);
Can we put this hunk into a static function overwrite_pages()?
Meh, it's only about 10 lines of code, and one caller.
4.
* block that have changed in the target system. It makes note of all the
* changed blocks in the pagemap of the file.
Can we replace the above with:
* block that has changed in the target system. It decides if the given
blkno in the target relfile needs to be overwritten from the source.
Ok. Again conceptually though, process_target_wal_block_change() just
collects information, and the decisions are made later. But you're right
that we do leave out truncated-away blocks here, so we are doing more
than just noting all the changed blocks.
/*
* Doesn't exist in either server. Why does it have an entry in the
* first place??
*/
return FILE_ACTION_NONE;
Can we delete the above hunk and add the following assert to the very
top of decide_file_action():
Assert(entry->target_exists || entry->source_exists);
I would like to keep the check even when assertions are not enabled.
I'll add an Assert(false) there.
7. Please address the FIXME for the symlink case:
/* FIXME: Check if it points to the same target? */
It's not a new issue. Would be nice to fix, of course. I'm not sure what
the right thing to do would be. If you have e.g. replaced
postgresql.conf with a symlink that points outside the data directory,
would it be appropriate to overwrite it? Or perhaps we should throw an
error? We also throw an error if a file is a symlink in the source but a
regular file in the target, or vice versa.
8.
* it anyway. But there's no harm in copying it now.)
and
* copy them here. But we don't know which scenario we're
* dealing with, and there's no harm in copying the missing
* blocks now, so do it now.
Could you add a line or two explaining why there is "no harm" in these
two hunks above?
The previous sentences explain that there's a WAL record covering them.
So they will be overwritten by WAL replay anyway. Does it need more
explanation?
14. queue_overwrite_range(), finish_overwrite() instead of
queue_fetch_range(), finish_fetch()? Similarly update\
*_fetch_file_range() and *_finish_fetch()
15. Let's have local_source.c and libpq_source.c instead of *_fetch.c
Good idea! And fetch.h -> rewind_source.h.
I also moved the code in execute_file_actions() function to pg_rewind.c,
into a new function: perform_rewind(). It felt a bit silly to have just
execute_file_actions() in a file of its own. perform_rewind() now does
all the modifications to the data directory, writing the backup file.
Except for writing the recovery config: that also needs to be when
there's no rewind to do, so it makes sense to keep it separate. What do
you think?
16.
conn = PQconnectdb(connstr_source);
if (PQstatus(conn) == CONNECTION_BAD)
pg_fatal("could not connect to server: %s",
PQerrorMessage(conn));
if (showprogress)
pg_log_info("connected to server");
The above hunk should be part of init_libpq_source(). Consequently,
init_libpq_source() should take a connection string instead of a conn.
The libpq connection is also needed by WriteRecoveryConfig(), that's why
it's not fully encapsulated in libpq_source.
19.
typedef struct
{
const char *path; /* path relative to data directory root */
uint64 offset;
uint32 length;
} fetch_range_request;
offset should be of type off_t
The 'offset' argument to the queue_fetch_range function is uint64, and
the argument to the SQL-callable pg_read_binary_file() isint8, so it's
consistent with them. Then again, the 'len' argument to
queue_fetch_range() is a size_t, and to pg_read_binary_file() int8, so
it's not fully consistent with that either. I'll try to make it more
consistent.
Thanks for the review! Attached is a new version of the patch set.
- Heikki
>From ddc02fb949811c36c94bd42520ecdcb672493397 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakan...@iki.fi>
Date: Thu, 24 Sep 2020 12:23:39 +0300
Subject: [PATCH v3 1/5] pg_rewind: Move syncTargetDirectory() to file_ops.c
For consistency. All the other low-level functions that operate on the
target directory are in file_ops.c.
Reviewed-by: Michael Paquier
Discussion: https://www.postgresql.org/message-id/0c5b3783-af52-3ee5-f8fa-6e794061f70d%40iki.fi
---
src/bin/pg_rewind/file_ops.c | 19 +++++++++++++++++++
src/bin/pg_rewind/file_ops.h | 1 +
src/bin/pg_rewind/pg_rewind.c | 22 +---------------------
src/bin/pg_rewind/pg_rewind.h | 1 +
4 files changed, 22 insertions(+), 21 deletions(-)
diff --git a/src/bin/pg_rewind/file_ops.c b/src/bin/pg_rewind/file_ops.c
index b3bf091c546..55439db20ba 100644
--- a/src/bin/pg_rewind/file_ops.c
+++ b/src/bin/pg_rewind/file_ops.c
@@ -19,6 +19,7 @@
#include <unistd.h>
#include "common/file_perm.h"
+#include "common/file_utils.h"
#include "file_ops.h"
#include "filemap.h"
#include "pg_rewind.h"
@@ -266,6 +267,24 @@ remove_target_symlink(const char *path)
dstpath);
}
+/*
+ * Sync target data directory to ensure that modifications are safely on disk.
+ *
+ * We do this once, for the whole data directory, for performance reasons. At
+ * the end of pg_rewind's run, the kernel is likely to already have flushed
+ * most dirty buffers to disk. Additionally fsync_pgdata uses a two-pass
+ * approach (only initiating writeback in the first pass), which often reduces
+ * the overall amount of IO noticeably.
+ */
+void
+sync_target_dir(void)
+{
+ if (!do_sync || dry_run)
+ return;
+
+ fsync_pgdata(datadir_target, PG_VERSION_NUM);
+}
+
/*
* Read a file into memory. The file to be read is <datadir>/<path>.
diff --git a/src/bin/pg_rewind/file_ops.h b/src/bin/pg_rewind/file_ops.h
index 025f24141c9..d8466385cf5 100644
--- a/src/bin/pg_rewind/file_ops.h
+++ b/src/bin/pg_rewind/file_ops.h
@@ -19,6 +19,7 @@ extern void remove_target_file(const char *path, bool missing_ok);
extern void truncate_target_file(const char *path, off_t newsize);
extern void create_target(file_entry_t *t);
extern void remove_target(file_entry_t *t);
+extern void sync_target_dir(void);
extern char *slurpFile(const char *datadir, const char *path, size_t *filesize);
diff --git a/src/bin/pg_rewind/pg_rewind.c b/src/bin/pg_rewind/pg_rewind.c
index 0ec52cb0327..5a7ab764db4 100644
--- a/src/bin/pg_rewind/pg_rewind.c
+++ b/src/bin/pg_rewind/pg_rewind.c
@@ -20,7 +20,6 @@
#include "catalog/pg_control.h"
#include "common/controldata_utils.h"
#include "common/file_perm.h"
-#include "common/file_utils.h"
#include "common/restricted_token.h"
#include "common/string.h"
#include "fe_utils/recovery_gen.h"
@@ -38,7 +37,6 @@ static void createBackupLabel(XLogRecPtr startpoint, TimeLineID starttli,
static void digestControlFile(ControlFileData *ControlFile, char *source,
size_t size);
-static void syncTargetDirectory(void);
static void getRestoreCommand(const char *argv0);
static void sanityChecks(void);
static void findCommonAncestorTimeline(XLogRecPtr *recptr, int *tliIndex);
@@ -455,7 +453,7 @@ main(int argc, char **argv)
if (showprogress)
pg_log_info("syncing target data directory");
- syncTargetDirectory();
+ sync_target_dir();
if (writerecoveryconf && !dry_run)
WriteRecoveryConfig(conn, datadir_target,
@@ -803,24 +801,6 @@ digestControlFile(ControlFileData *ControlFile, char *src, size_t size)
checkControlFile(ControlFile);
}
-/*
- * Sync target data directory to ensure that modifications are safely on disk.
- *
- * We do this once, for the whole data directory, for performance reasons. At
- * the end of pg_rewind's run, the kernel is likely to already have flushed
- * most dirty buffers to disk. Additionally fsync_pgdata uses a two-pass
- * approach (only initiating writeback in the first pass), which often reduces
- * the overall amount of IO noticeably.
- */
-static void
-syncTargetDirectory(void)
-{
- if (!do_sync || dry_run)
- return;
-
- fsync_pgdata(datadir_target, PG_VERSION_NUM);
-}
-
/*
* Get value of GUC parameter restore_command from the target cluster.
*
diff --git a/src/bin/pg_rewind/pg_rewind.h b/src/bin/pg_rewind/pg_rewind.h
index 8a9319ed675..67f90c2a38c 100644
--- a/src/bin/pg_rewind/pg_rewind.h
+++ b/src/bin/pg_rewind/pg_rewind.h
@@ -24,6 +24,7 @@ extern char *datadir_source;
extern char *connstr_source;
extern bool showprogress;
extern bool dry_run;
+extern bool do_sync;
extern int WalSegSz;
/* Target history */
--
2.20.1
>From 53670bbfa8c7f19eaafa8885d1638298edbf2b71 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakan...@iki.fi>
Date: Thu, 24 Sep 2020 20:08:13 +0300
Subject: [PATCH v3 2/5] Refactor pg_rewind for more clear decision making.
Deciding what to do with each file is now a separate step after all the
necessary information has been gathered. It is more clear that way.
Previously, the decision-making was divided between process_source_file()
and process_target_file(), and it was a bit hard to piece together what the
overall rules were.
Reviewed-by: Kyotaro Horiguchi, Soumyadeep Chakraborty
Discussion: https://www.postgresql.org/message-id/0c5b3783-af52-3ee5-f8fa-6e794061f70d%40iki.fi
---
src/bin/pg_rewind/copy_fetch.c | 14 +-
src/bin/pg_rewind/file_ops.c | 16 +-
src/bin/pg_rewind/filemap.c | 571 +++++++++++++++++---------------
src/bin/pg_rewind/filemap.h | 69 ++--
src/bin/pg_rewind/libpq_fetch.c | 12 +-
src/bin/pg_rewind/parsexlog.c | 2 +-
src/bin/pg_rewind/pg_rewind.c | 8 +-
7 files changed, 381 insertions(+), 311 deletions(-)
diff --git a/src/bin/pg_rewind/copy_fetch.c b/src/bin/pg_rewind/copy_fetch.c
index 1edab5f1867..e4b8ce6aaf4 100644
--- a/src/bin/pg_rewind/copy_fetch.c
+++ b/src/bin/pg_rewind/copy_fetch.c
@@ -210,7 +210,7 @@ copy_executeFileMap(filemap_t *map)
for (i = 0; i < map->narray; i++)
{
entry = map->array[i];
- execute_pagemap(&entry->pagemap, entry->path);
+ execute_pagemap(&entry->target_pages_to_overwrite, entry->path);
switch (entry->action)
{
@@ -219,16 +219,16 @@ copy_executeFileMap(filemap_t *map)
break;
case FILE_ACTION_COPY:
- rewind_copy_file_range(entry->path, 0, entry->newsize, true);
+ rewind_copy_file_range(entry->path, 0, entry->source_size, true);
break;
case FILE_ACTION_TRUNCATE:
- truncate_target_file(entry->path, entry->newsize);
+ truncate_target_file(entry->path, entry->source_size);
break;
case FILE_ACTION_COPY_TAIL:
- rewind_copy_file_range(entry->path, entry->oldsize,
- entry->newsize, false);
+ rewind_copy_file_range(entry->path, entry->target_size,
+ entry->source_size, false);
break;
case FILE_ACTION_CREATE:
@@ -238,6 +238,10 @@ copy_executeFileMap(filemap_t *map)
case FILE_ACTION_REMOVE:
remove_target(entry);
break;
+
+ case FILE_ACTION_UNDECIDED:
+ pg_fatal("no action decided for \"%s\"", entry->path);
+ break;
}
}
diff --git a/src/bin/pg_rewind/file_ops.c b/src/bin/pg_rewind/file_ops.c
index 55439db20ba..ec37d0b2e0d 100644
--- a/src/bin/pg_rewind/file_ops.c
+++ b/src/bin/pg_rewind/file_ops.c
@@ -126,8 +126,9 @@ void
remove_target(file_entry_t *entry)
{
Assert(entry->action == FILE_ACTION_REMOVE);
+ Assert(entry->target_exists);
- switch (entry->type)
+ switch (entry->target_type)
{
case FILE_TYPE_DIRECTORY:
remove_target_dir(entry->path);
@@ -140,6 +141,10 @@ remove_target(file_entry_t *entry)
case FILE_TYPE_SYMLINK:
remove_target_symlink(entry->path);
break;
+
+ case FILE_TYPE_UNDEFINED:
+ pg_fatal("undefined file type for \"%s\"", entry->path);
+ break;
}
}
@@ -147,21 +152,26 @@ void
create_target(file_entry_t *entry)
{
Assert(entry->action == FILE_ACTION_CREATE);
+ Assert(!entry->target_exists);
- switch (entry->type)
+ switch (entry->source_type)
{
case FILE_TYPE_DIRECTORY:
create_target_dir(entry->path);
break;
case FILE_TYPE_SYMLINK:
- create_target_symlink(entry->path, entry->link_target);
+ create_target_symlink(entry->path, entry->source_link_target);
break;
case FILE_TYPE_REGULAR:
/* can't happen. Regular files are created with open_target_file. */
pg_fatal("invalid action (CREATE) for regular file");
break;
+
+ case FILE_TYPE_UNDEFINED:
+ pg_fatal("undefined file type for \"%s\"", entry->path);
+ break;
}
}
diff --git a/src/bin/pg_rewind/filemap.c b/src/bin/pg_rewind/filemap.c
index 1abc257177e..79e5bfdc7d1 100644
--- a/src/bin/pg_rewind/filemap.c
+++ b/src/bin/pg_rewind/filemap.c
@@ -26,6 +26,8 @@ static bool isRelDataFile(const char *path);
static char *datasegpath(RelFileNode rnode, ForkNumber forknum,
BlockNumber segno);
static int path_cmp(const void *a, const void *b);
+
+static file_entry_t *get_filemap_entry(const char *path, bool create);
static int final_filemap_cmp(const void *a, const void *b);
static void filemap_list_to_array(filemap_t *map);
static bool check_file_excluded(const char *path, bool is_source);
@@ -146,33 +148,79 @@ filemap_create(void)
filemap = map;
}
+/* Look up or create entry for 'path' */
+static file_entry_t *
+get_filemap_entry(const char *path, bool create)
+{
+ filemap_t *map = filemap;
+ file_entry_t *entry;
+ file_entry_t **e;
+ file_entry_t key;
+ file_entry_t *key_ptr;
+
+ if (map->array)
+ {
+ key.path = (char *) path;
+ key_ptr = &key;
+ e = bsearch(&key_ptr, map->array, map->narray, sizeof(file_entry_t *),
+ path_cmp);
+ }
+ else
+ e = NULL;
+
+ if (e)
+ entry = *e;
+ else if (!create)
+ entry = NULL;
+ else
+ {
+ /* Create a new entry for this file */
+ entry = pg_malloc(sizeof(file_entry_t));
+ entry->path = pg_strdup(path);
+ entry->isrelfile = isRelDataFile(path);
+ entry->action = FILE_ACTION_UNDECIDED;
+
+ entry->target_exists = false;
+ entry->target_type = FILE_TYPE_UNDEFINED;
+ entry->target_size = 0;
+ entry->target_link_target = NULL;
+ entry->target_pages_to_overwrite.bitmap = NULL;
+ entry->target_pages_to_overwrite.bitmapsize = 0;
+
+ entry->source_exists = false;
+ entry->source_type = FILE_TYPE_UNDEFINED;
+ entry->source_size = 0;
+ entry->source_link_target = NULL;
+
+ entry->next = NULL;
+
+ if (map->last)
+ {
+ map->last->next = entry;
+ map->last = entry;
+ }
+ else
+ map->first = map->last = entry;
+ map->nlist++;
+ }
+
+ return entry;
+}
+
/*
* Callback for processing source file list.
*
- * This is called once for every file in the source server. We decide what
- * action needs to be taken for the file, depending on whether the file
- * exists in the target and whether the size matches.
+ * This is called once for every file in the source server. We record the
+ * type and size of file, so that decide_file_action() can later decide what
+ * to do with it.
*/
void
-process_source_file(const char *path, file_type_t type, size_t newsize,
+process_source_file(const char *path, file_type_t type, size_t size,
const char *link_target)
{
- bool exists;
- char localpath[MAXPGPATH];
- struct stat statbuf;
- filemap_t *map = filemap;
- file_action_t action = FILE_ACTION_NONE;
- size_t oldsize = 0;
file_entry_t *entry;
- Assert(map->array == NULL);
-
- /*
- * Skip any files matching the exclusion filters. This has the effect to
- * remove all those files on the target.
- */
- if (check_file_excluded(path, true))
- return;
+ Assert(filemap->array == NULL);
/*
* Pretend that pg_wal is a directory, even if it's really a symlink. We
@@ -182,16 +230,6 @@ process_source_file(const char *path, file_type_t type, size_t newsize,
if (strcmp(path, "pg_wal") == 0 && type == FILE_TYPE_SYMLINK)
type = FILE_TYPE_DIRECTORY;
- /*
- * Skip temporary files, .../pgsql_tmp/... and .../pgsql_tmp.* in source.
- * This has the effect that all temporary files in the destination will be
- * removed.
- */
- if (strstr(path, "/" PG_TEMP_FILE_PREFIX) != NULL)
- return;
- if (strstr(path, "/" PG_TEMP_FILES_DIR "/") != NULL)
- return;
-
/*
* sanity check: a filename that looks like a data file better be a
* regular file
@@ -199,158 +237,25 @@ process_source_file(const char *path, file_type_t type, size_t newsize,
if (type != FILE_TYPE_REGULAR && isRelDataFile(path))
pg_fatal("data file \"%s\" in source is not a regular file", path);
- snprintf(localpath, sizeof(localpath), "%s/%s", datadir_target, path);
-
- /* Does the corresponding file exist in the target data dir? */
- if (lstat(localpath, &statbuf) < 0)
- {
- if (errno != ENOENT)
- pg_fatal("could not stat file \"%s\": %m",
- localpath);
-
- exists = false;
- }
- else
- exists = true;
-
- switch (type)
- {
- case FILE_TYPE_DIRECTORY:
- if (exists && !S_ISDIR(statbuf.st_mode) && strcmp(path, "pg_wal") != 0)
- {
- /* it's a directory in source, but not in target. Strange.. */
- pg_fatal("\"%s\" is not a directory", localpath);
- }
-
- if (!exists)
- action = FILE_ACTION_CREATE;
- else
- action = FILE_ACTION_NONE;
- oldsize = 0;
- break;
-
- case FILE_TYPE_SYMLINK:
- if (exists &&
-#ifndef WIN32
- !S_ISLNK(statbuf.st_mode)
-#else
- !pgwin32_is_junction(localpath)
-#endif
- )
- {
- /*
- * It's a symbolic link in source, but not in target.
- * Strange..
- */
- pg_fatal("\"%s\" is not a symbolic link", localpath);
- }
-
- if (!exists)
- action = FILE_ACTION_CREATE;
- else
- action = FILE_ACTION_NONE;
- oldsize = 0;
- break;
-
- case FILE_TYPE_REGULAR:
- if (exists && !S_ISREG(statbuf.st_mode))
- pg_fatal("\"%s\" is not a regular file", localpath);
-
- if (!exists || !isRelDataFile(path))
- {
- /*
- * File exists in source, but not in target. Or it's a
- * non-data file that we have no special processing for. Copy
- * it in toto.
- *
- * An exception: PG_VERSIONs should be identical, but avoid
- * overwriting it for paranoia.
- */
- if (pg_str_endswith(path, "PG_VERSION"))
- {
- action = FILE_ACTION_NONE;
- oldsize = statbuf.st_size;
- }
- else
- {
- action = FILE_ACTION_COPY;
- oldsize = 0;
- }
- }
- else
- {
- /*
- * It's a data file that exists in both.
- *
- * If it's larger in target, we can truncate it. There will
- * also be a WAL record of the truncation in the source
- * system, so WAL replay would eventually truncate the target
- * too, but we might as well do it now.
- *
- * If it's smaller in the target, it means that it has been
- * truncated in the target, or enlarged in the source, or
- * both. If it was truncated in the target, we need to copy
- * the missing tail from the source system. If it was enlarged
- * in the source system, there will be WAL records in the
- * source system for the new blocks, so we wouldn't need to
- * copy them here. But we don't know which scenario we're
- * dealing with, and there's no harm in copying the missing
- * blocks now, so do it now.
- *
- * If it's the same size, do nothing here. Any blocks modified
- * in the target will be copied based on parsing the target
- * system's WAL, and any blocks modified in the source will be
- * updated after rewinding, when the source system's WAL is
- * replayed.
- */
- oldsize = statbuf.st_size;
- if (oldsize < newsize)
- action = FILE_ACTION_COPY_TAIL;
- else if (oldsize > newsize)
- action = FILE_ACTION_TRUNCATE;
- else
- action = FILE_ACTION_NONE;
- }
- break;
- }
-
- /* Create a new entry for this file */
- entry = pg_malloc(sizeof(file_entry_t));
- entry->path = pg_strdup(path);
- entry->type = type;
- entry->action = action;
- entry->oldsize = oldsize;
- entry->newsize = newsize;
- entry->link_target = link_target ? pg_strdup(link_target) : NULL;
- entry->next = NULL;
- entry->pagemap.bitmap = NULL;
- entry->pagemap.bitmapsize = 0;
- entry->isrelfile = isRelDataFile(path);
-
- if (map->last)
- {
- map->last->next = entry;
- map->last = entry;
- }
- else
- map->first = map->last = entry;
- map->nlist++;
+ /* Remember this source file */
+ entry = get_filemap_entry(path, true);
+ entry->source_exists = true;
+ entry->source_type = type;
+ entry->source_size = size;
+ entry->source_link_target = link_target ? pg_strdup(link_target) : NULL;
}
/*
* Callback for processing target file list.
*
- * All source files must be already processed before calling this. This only
- * marks target data directory's files that didn't exist in the source for
- * deletion.
+ * All source files must be already processed before calling this. We record
+ * the type and size of file, so that decide_file_action() can later decide
+ * what to do with it.
*/
void
-process_target_file(const char *path, file_type_t type, size_t oldsize,
+process_target_file(const char *path, file_type_t type, size_t size,
const char *link_target)
{
- bool exists;
- file_entry_t key;
- file_entry_t *key_ptr;
filemap_t *map = filemap;
file_entry_t *entry;
@@ -359,7 +264,6 @@ process_target_file(const char *path, file_type_t type, size_t oldsize,
* from the target data folder all paths which have been filtered out from
* the source data folder when processing the source files.
*/
-
if (map->array == NULL)
{
/* on first call, initialize lookup array */
@@ -377,120 +281,77 @@ process_target_file(const char *path, file_type_t type, size_t oldsize,
}
/*
- * Like in process_source_file, pretend that xlog is always a directory.
+ * Like in process_source_file, pretend that pg_wal is always a directory.
*/
if (strcmp(path, "pg_wal") == 0 && type == FILE_TYPE_SYMLINK)
type = FILE_TYPE_DIRECTORY;
- key.path = (char *) path;
- key_ptr = &key;
- exists = (bsearch(&key_ptr, map->array, map->narray, sizeof(file_entry_t *),
- path_cmp) != NULL);
-
- /* Remove any file or folder that doesn't exist in the source system. */
- if (!exists)
- {
- entry = pg_malloc(sizeof(file_entry_t));
- entry->path = pg_strdup(path);
- entry->type = type;
- entry->action = FILE_ACTION_REMOVE;
- entry->oldsize = oldsize;
- entry->newsize = 0;
- entry->link_target = link_target ? pg_strdup(link_target) : NULL;
- entry->next = NULL;
- entry->pagemap.bitmap = NULL;
- entry->pagemap.bitmapsize = 0;
- entry->isrelfile = isRelDataFile(path);
-
- if (map->last == NULL)
- map->first = entry;
- else
- map->last->next = entry;
- map->last = entry;
- map->nlist++;
- }
- else
- {
- /*
- * We already handled all files that exist in the source system in
- * process_source_file().
- */
- }
+ /* Remember this target file */
+ entry = get_filemap_entry(path, true);
+ entry->target_exists = true;
+ entry->target_type = type;
+ entry->target_size = size;
+ entry->target_link_target = link_target ? pg_strdup(link_target) : NULL;
}
/*
* This callback gets called while we read the WAL in the target, for every
- * block that have changed in the target system. It makes note of all the
- * changed blocks in the pagemap of the file.
+ * block that has changed in the target system. It decides if the given
+ * 'blkno' in the target relfile needs to be overwritten from the source, and
+ * if so, records it in 'target_pages_to_overwrite' bitmap.
+ *
+ * NOTE: All the files on both systems must have already been added to the
+ * file map!
*/
void
-process_block_change(ForkNumber forknum, RelFileNode rnode, BlockNumber blkno)
+process_target_wal_block_change(ForkNumber forknum, RelFileNode rnode,
+ BlockNumber blkno)
{
char *path;
- file_entry_t key;
- file_entry_t *key_ptr;
file_entry_t *entry;
BlockNumber blkno_inseg;
int segno;
- filemap_t *map = filemap;
- file_entry_t **e;
- Assert(map->array);
+ Assert(filemap->array);
segno = blkno / RELSEG_SIZE;
blkno_inseg = blkno % RELSEG_SIZE;
path = datasegpath(rnode, forknum, segno);
-
- key.path = (char *) path;
- key_ptr = &key;
-
- e = bsearch(&key_ptr, map->array, map->narray, sizeof(file_entry_t *),
- path_cmp);
- if (e)
- entry = *e;
- else
- entry = NULL;
+ entry = get_filemap_entry(path, false);
pfree(path);
if (entry)
{
- Assert(entry->isrelfile);
+ int64 end_offset;
- switch (entry->action)
- {
- case FILE_ACTION_NONE:
- case FILE_ACTION_TRUNCATE:
- /* skip if we're truncating away the modified block anyway */
- if ((blkno_inseg + 1) * BLCKSZ <= entry->newsize)
- datapagemap_add(&entry->pagemap, blkno_inseg);
- break;
-
- case FILE_ACTION_COPY_TAIL:
-
- /*
- * skip the modified block if it is part of the "tail" that
- * we're copying anyway.
- */
- if ((blkno_inseg + 1) * BLCKSZ <= entry->oldsize)
- datapagemap_add(&entry->pagemap, blkno_inseg);
- break;
+ Assert(entry->isrelfile);
- case FILE_ACTION_COPY:
- case FILE_ACTION_REMOVE:
- break;
+ if (entry->target_type != FILE_TYPE_REGULAR)
+ pg_fatal("unexpected page modification for non-regular file \"%s\"",
+ entry->path);
- case FILE_ACTION_CREATE:
- pg_fatal("unexpected page modification for directory or symbolic link \"%s\"", entry->path);
- }
+ /*
+ * If the block beyond the EOF in the source system, no need to
+ * remember it now, because we're going to truncate it away from the
+ * target anyway. Also no need to remember the block if it's beyond
+ * the current EOF in the target system; we will copy it over with the
+ * "tail" from the source system, anyway.
+ */
+ end_offset = (blkno_inseg + 1) * BLCKSZ;
+ if (end_offset <= entry->source_size &&
+ end_offset <= entry->target_size)
+ datapagemap_add(&entry->target_pages_to_overwrite, blkno_inseg);
}
else
{
/*
* If we don't have any record of this file in the file map, it means
- * that it's a relation that doesn't exist in the source system, and
- * it was subsequently removed in the target system, too. We can
- * safely ignore it.
+ * that it's a relation that doesn't exist in the source system. It
+ * could exist in the target system; we haven't moved the target-only
+ * entries from the linked list to the array yet! But in any case, if
+ * it doesn't exist in the source it will be removed from the target
+ * too, and we can safely ignore it.
*/
}
}
@@ -505,6 +366,15 @@ check_file_excluded(const char *path, bool is_source)
int excludeIdx;
const char *filename;
+ /*
+ * Skip all temporary files, .../pgsql_tmp/... and .../pgsql_tmp.*
+ */
+ if (strstr(path, "/" PG_TEMP_FILE_PREFIX) != NULL ||
+ strstr(path, "/" PG_TEMP_FILES_DIR "/") != NULL)
+ {
+ return true;
+ }
+
/* check individual files... */
for (excludeIdx = 0; excludeFiles[excludeIdx].name != NULL; excludeIdx++)
{
@@ -581,16 +451,6 @@ filemap_list_to_array(filemap_t *map)
map->first = map->last = NULL;
}
-void
-filemap_finalize(void)
-{
- filemap_t *map = filemap;
-
- filemap_list_to_array(map);
- qsort(map->array, map->narray, sizeof(file_entry_t *),
- final_filemap_cmp);
-}
-
static const char *
action_to_str(file_action_t action)
{
@@ -631,26 +491,26 @@ calculate_totals(void)
{
entry = map->array[i];
- if (entry->type != FILE_TYPE_REGULAR)
+ if (entry->source_type != FILE_TYPE_REGULAR)
continue;
- map->total_size += entry->newsize;
+ map->total_size += entry->source_size;
if (entry->action == FILE_ACTION_COPY)
{
- map->fetch_size += entry->newsize;
+ map->fetch_size += entry->source_size;
continue;
}
if (entry->action == FILE_ACTION_COPY_TAIL)
- map->fetch_size += (entry->newsize - entry->oldsize);
+ map->fetch_size += (entry->source_size - entry->target_size);
- if (entry->pagemap.bitmapsize > 0)
+ if (entry->target_pages_to_overwrite.bitmapsize > 0)
{
datapagemap_iterator_t *iter;
BlockNumber blk;
- iter = datapagemap_iterate(&entry->pagemap);
+ iter = datapagemap_iterate(&entry->target_pages_to_overwrite);
while (datapagemap_next(iter, &blk))
map->fetch_size += BLCKSZ;
@@ -670,13 +530,13 @@ print_filemap(void)
{
entry = map->array[i];
if (entry->action != FILE_ACTION_NONE ||
- entry->pagemap.bitmapsize > 0)
+ entry->target_pages_to_overwrite.bitmapsize > 0)
{
pg_log_debug("%s (%s)", entry->path,
action_to_str(entry->action));
- if (entry->pagemap.bitmapsize > 0)
- datapagemap_print(&entry->pagemap);
+ if (entry->target_pages_to_overwrite.bitmapsize > 0)
+ datapagemap_print(&entry->target_pages_to_overwrite);
}
}
fflush(stdout);
@@ -825,3 +685,170 @@ final_filemap_cmp(const void *a, const void *b)
else
return strcmp(fa->path, fb->path);
}
+
+/*
+ * Decide what action to perform to a file.
+ */
+static file_action_t
+decide_file_action(file_entry_t *entry)
+{
+ const char *path = entry->path;
+
+ /*
+ * Don't touch the control file. It is handled specially, after copying
+ * all the other files.
+ */
+ if (strcmp(path, "global/pg_control") == 0)
+ return FILE_ACTION_NONE;
+
+ /*
+ * Remove all files matching the exclusion filters in the target.
+ */
+ if (check_file_excluded(path, true))
+ {
+ if (entry->target_exists)
+ return FILE_ACTION_REMOVE;
+ else
+ return FILE_ACTION_NONE;
+ }
+
+ /*
+ * Handle cases where the file is missing from one of the systems.
+ */
+ if (!entry->target_exists && entry->source_exists)
+ {
+ /*
+ * File exists in source, but not in target. Copy it in toto. (If it's
+ * a relation data file, WAL replay after rewinding should re-create
+ * it anyway. But there's no harm in copying it now.)
+ */
+ switch (entry->source_type)
+ {
+ case FILE_TYPE_DIRECTORY:
+ case FILE_TYPE_SYMLINK:
+ return FILE_ACTION_CREATE;
+ case FILE_TYPE_REGULAR:
+ return FILE_ACTION_COPY;
+ case FILE_TYPE_UNDEFINED:
+ pg_fatal("unknown file type for \"%s\"", entry->path);
+ break;
+ }
+ }
+ else if (entry->target_exists && !entry->source_exists)
+ {
+ /* File exists in target, but not source. Remove it. */
+ return FILE_ACTION_REMOVE;
+ }
+ else if (!entry->target_exists && !entry->source_exists)
+ {
+ /*
+ * Doesn't exist in either server. Why does it have an entry in the
+ * first place??
+ */
+ Assert(false);
+ return FILE_ACTION_NONE;
+ }
+
+ /*
+ * Otherwise, the file exists on both systems
+ */
+ Assert(entry->target_exists && entry->source_exists);
+
+ if (entry->source_type != entry->target_type)
+ {
+ /* But it's a different kind of object. Strange.. */
+ pg_fatal("file \"%s\" is of different type in source and target", entry->path);
+ }
+
+ /*
+ * PG_VERSION files should be identical on both systems, but avoid
+ * overwriting them for paranoia.
+ */
+ if (pg_str_endswith(entry->path, "PG_VERSION"))
+ return FILE_ACTION_NONE;
+
+ switch (entry->source_type)
+ {
+ case FILE_TYPE_DIRECTORY:
+ return FILE_ACTION_NONE;
+
+ case FILE_TYPE_SYMLINK:
+ /*
+ * XXX: Should we check if it points to the same target?
+ */
+ return FILE_ACTION_NONE;
+
+ case FILE_TYPE_REGULAR:
+ if (!entry->isrelfile)
+ {
+ /*
+ * It's a non-data file that we have no special processing
+ * for. Copy it in toto.
+ */
+ return FILE_ACTION_COPY;
+ }
+ else
+ {
+ /*
+ * It's a data file that exists in both systems.
+ *
+ * If it's larger in target, we can truncate it. There will
+ * also be a WAL record of the truncation in the source
+ * system, so WAL replay would eventually truncate the target
+ * too, but we might as well do it now.
+ *
+ * If it's smaller in the target, it means that it has been
+ * truncated in the target, or enlarged in the source, or
+ * both. If it was truncated in the target, we need to copy
+ * the missing tail from the source system. If it was enlarged
+ * in the source system, there will be WAL records in the
+ * source system for the new blocks, so we wouldn't need to
+ * copy them here. But we don't know which scenario we're
+ * dealing with, and there's no harm in copying the missing
+ * blocks now, so do it now.
+ *
+ * If it's the same size, do nothing here. Any blocks modified
+ * in the target will be copied based on parsing the target
+ * system's WAL, and any blocks modified in the source will be
+ * updated after rewinding, when the source system's WAL is
+ * replayed.
+ */
+ if (entry->target_size < entry->source_size)
+ return FILE_ACTION_COPY_TAIL;
+ else if (entry->target_size > entry->source_size)
+ return FILE_ACTION_TRUNCATE;
+ else
+ return FILE_ACTION_NONE;
+ }
+ break;
+
+ case FILE_TYPE_UNDEFINED:
+ pg_fatal("unknown file type for \"%s\"", path);
+ break;
+ }
+
+ /* unreachable */
+ pg_fatal("could not decide what to do with file \"%s\"", path);
+}
+
+/*
+ * Decide what to do with each file.
+ */
+void
+decide_file_actions()
+{
+ int i;
+
+ filemap_list_to_array(filemap);
+
+ for (i = 0; i < filemap->narray; i++)
+ {
+ file_entry_t *entry = filemap->array[i];
+
+ entry->action = decide_file_action(entry);
+ }
+
+ /* Sort the actions to the order that they should be performed */
+ qsort(filemap->array, filemap->narray, sizeof(file_entry_t *),
+ final_filemap_cmp);
+}
diff --git a/src/bin/pg_rewind/filemap.h b/src/bin/pg_rewind/filemap.h
index 0cb7425170c..73c687e4e34 100644
--- a/src/bin/pg_rewind/filemap.h
+++ b/src/bin/pg_rewind/filemap.h
@@ -14,17 +14,21 @@
/*
* For every file found in the local or remote system, we have a file entry
- * which says what we are going to do with the file. For relation files,
- * there is also a page map, marking pages in the file that were changed
- * locally.
- *
- * The enum values are sorted in the order we want actions to be processed.
+ * that contains information about the file on both systems. For relation
+ * files, there is also a page map that marks pages in the file that were
+ * changed in the target after the last common checkpoint. Each entry also
+ * contains an 'action' field, which says what we are going to do with the
+ * file.
*/
+
+/* these enum values are sorted in the order we want actions to be processed */
typedef enum
{
+ FILE_ACTION_UNDECIDED = 0, /* not decided yet */
+
FILE_ACTION_CREATE, /* create local directory or symbolic link */
FILE_ACTION_COPY, /* copy whole file, overwriting if exists */
- FILE_ACTION_COPY_TAIL, /* copy tail from 'oldsize' to 'newsize' */
+ FILE_ACTION_COPY_TAIL, /* copy tail from 'source_size' to 'target_size' */
FILE_ACTION_NONE, /* no action (we might still copy modified
* blocks based on the parsed WAL) */
FILE_ACTION_TRUNCATE, /* truncate local file to 'newsize' bytes */
@@ -33,6 +37,8 @@ typedef enum
typedef enum
{
+ FILE_TYPE_UNDEFINED = 0,
+
FILE_TYPE_REGULAR,
FILE_TYPE_DIRECTORY,
FILE_TYPE_SYMLINK
@@ -41,19 +47,34 @@ typedef enum
typedef struct file_entry_t
{
char *path;
- file_type_t type;
+ bool isrelfile; /* is it a relation data file? */
- file_action_t action;
+ /*
+ * Status of the file in the target.
+ */
+ bool target_exists;
+ file_type_t target_type;
+ size_t target_size; /* for a regular file */
+ char *target_link_target; /* for a symlink */
- /* for a regular file */
- size_t oldsize;
- size_t newsize;
- bool isrelfile; /* is it a relation data file? */
+ /*
+ * Pages that were modified in the target and need to be replaced
+ * from the source.
+ */
+ datapagemap_t target_pages_to_overwrite;
- datapagemap_t pagemap;
+ /*
+ * Status of the file in the source.
+ */
+ bool source_exists;
+ file_type_t source_type;
+ size_t source_size;
+ char *source_link_target; /* for a symlink */
- /* for a symlink */
- char *link_target;
+ /*
+ * What will we do to the file?
+ */
+ file_action_t action;
struct file_entry_t *next;
} file_entry_t;
@@ -79,11 +100,10 @@ typedef struct filemap_t
int narray; /* current length of array */
/*
- * Summary information. total_size is the total size of the source
- * cluster, and fetch_size is the number of bytes that needs to be copied.
+ * Summary information.
*/
- uint64 total_size;
- uint64 fetch_size;
+ uint64 total_size; /* total size of the source cluster */
+ uint64 fetch_size; /* number of bytes that needs to be copied */
} filemap_t;
extern filemap_t *filemap;
@@ -94,11 +114,12 @@ extern void print_filemap(void);
/* Functions for populating the filemap */
extern void process_source_file(const char *path, file_type_t type,
- size_t newsize, const char *link_target);
+ size_t size, const char *link_target);
extern void process_target_file(const char *path, file_type_t type,
- size_t newsize, const char *link_target);
-extern void process_block_change(ForkNumber forknum, RelFileNode rnode,
- BlockNumber blkno);
-extern void filemap_finalize(void);
+ size_t size, const char *link_target);
+extern void process_target_wal_block_change(ForkNumber forknum,
+ RelFileNode rnode,
+ BlockNumber blkno);
+extern void decide_file_actions(void);
#endif /* FILEMAP_H */
diff --git a/src/bin/pg_rewind/libpq_fetch.c b/src/bin/pg_rewind/libpq_fetch.c
index bf4dfc23b96..2fc4a784bdb 100644
--- a/src/bin/pg_rewind/libpq_fetch.c
+++ b/src/bin/pg_rewind/libpq_fetch.c
@@ -465,7 +465,7 @@ libpq_executeFileMap(filemap_t *map)
entry = map->array[i];
/* If this is a relation file, copy the modified blocks */
- execute_pagemap(&entry->pagemap, entry->path);
+ execute_pagemap(&entry->target_pages_to_overwrite, entry->path);
switch (entry->action)
{
@@ -476,15 +476,15 @@ libpq_executeFileMap(filemap_t *map)
case FILE_ACTION_COPY:
/* Truncate the old file out of the way, if any */
open_target_file(entry->path, true);
- fetch_file_range(entry->path, 0, entry->newsize);
+ fetch_file_range(entry->path, 0, entry->source_size);
break;
case FILE_ACTION_TRUNCATE:
- truncate_target_file(entry->path, entry->newsize);
+ truncate_target_file(entry->path, entry->source_size);
break;
case FILE_ACTION_COPY_TAIL:
- fetch_file_range(entry->path, entry->oldsize, entry->newsize);
+ fetch_file_range(entry->path, entry->target_size, entry->source_size);
break;
case FILE_ACTION_REMOVE:
@@ -494,6 +494,10 @@ libpq_executeFileMap(filemap_t *map)
case FILE_ACTION_CREATE:
create_target(entry);
break;
+
+ case FILE_ACTION_UNDECIDED:
+ pg_fatal("no action decided for \"%s\"", entry->path);
+ break;
}
}
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index 2229c86f9af..2baeb74ae93 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -436,6 +436,6 @@ extractPageInfo(XLogReaderState *record)
if (forknum != MAIN_FORKNUM)
continue;
- process_block_change(forknum, rnode, blkno);
+ process_target_wal_block_change(forknum, rnode, blkno);
}
}
diff --git a/src/bin/pg_rewind/pg_rewind.c b/src/bin/pg_rewind/pg_rewind.c
index 5a7ab764db4..4760090d06e 100644
--- a/src/bin/pg_rewind/pg_rewind.c
+++ b/src/bin/pg_rewind/pg_rewind.c
@@ -369,7 +369,7 @@ main(int argc, char **argv)
chkpttli);
/*
- * Build the filemap, by comparing the source and target data directories.
+ * Collect information about all files in the target and source systems.
*/
filemap_create();
if (showprogress)
@@ -390,8 +390,12 @@ main(int argc, char **argv)
pg_log_info("reading WAL in target");
extractPageMap(datadir_target, chkptrec, lastcommontliIndex,
ControlFile_target.checkPoint, restore_command);
- filemap_finalize();
+ /*
+ * We have collected all information we need from both systems. Decide
+ * what to do with each file.
+ */
+ decide_file_actions();
if (showprogress)
calculate_totals();
--
2.20.1
>From 4356677c7dfb711e3422920c1d4e53a95a2c5a41 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakan...@iki.fi>
Date: Thu, 24 Sep 2020 18:00:06 +0300
Subject: [PATCH v3 3/5] pg_rewind: Replace the hybrid list+array data
structure with simplehash.
Now that simplehash can be use in frontend code, let's make use of it.
Reviewed-by: Kyotaro Horiguchi, Soumyadeep Chakraborty
Discussion: https://www.postgresql.org/message-id/0c5b3783-af52-3ee5-f8fa-6e794061f70d%40iki.fi
---
src/bin/pg_rewind/copy_fetch.c | 4 +-
src/bin/pg_rewind/fetch.c | 2 +-
src/bin/pg_rewind/fetch.h | 2 +-
src/bin/pg_rewind/filemap.c | 290 +++++++++++++++-----------------
src/bin/pg_rewind/filemap.h | 67 +++-----
src/bin/pg_rewind/libpq_fetch.c | 4 +-
src/bin/pg_rewind/pg_rewind.c | 14 +-
7 files changed, 175 insertions(+), 208 deletions(-)
diff --git a/src/bin/pg_rewind/copy_fetch.c b/src/bin/pg_rewind/copy_fetch.c
index e4b8ce6aaf4..1cd4449314d 100644
--- a/src/bin/pg_rewind/copy_fetch.c
+++ b/src/bin/pg_rewind/copy_fetch.c
@@ -207,9 +207,9 @@ copy_executeFileMap(filemap_t *map)
file_entry_t *entry;
int i;
- for (i = 0; i < map->narray; i++)
+ for (i = 0; i < map->nentries; i++)
{
- entry = map->array[i];
+ entry = map->entries[i];
execute_pagemap(&entry->target_pages_to_overwrite, entry->path);
switch (entry->action)
diff --git a/src/bin/pg_rewind/fetch.c b/src/bin/pg_rewind/fetch.c
index f18fe5386ed..f41d0f295ea 100644
--- a/src/bin/pg_rewind/fetch.c
+++ b/src/bin/pg_rewind/fetch.c
@@ -37,7 +37,7 @@ fetchSourceFileList(void)
* Fetch all relation data files that are marked in the given data page map.
*/
void
-executeFileMap(void)
+execute_file_actions(filemap_t *filemap)
{
if (datadir_source)
copy_executeFileMap(filemap);
diff --git a/src/bin/pg_rewind/fetch.h b/src/bin/pg_rewind/fetch.h
index 7cf8b6ea090..b20df8b1537 100644
--- a/src/bin/pg_rewind/fetch.h
+++ b/src/bin/pg_rewind/fetch.h
@@ -25,7 +25,7 @@
*/
extern void fetchSourceFileList(void);
extern char *fetchFile(const char *filename, size_t *filesize);
-extern void executeFileMap(void);
+extern void execute_file_actions(filemap_t *filemap);
/* in libpq_fetch.c */
extern void libpqProcessFileList(void);
diff --git a/src/bin/pg_rewind/filemap.c b/src/bin/pg_rewind/filemap.c
index 79e5bfdc7d1..b912e36ad09 100644
--- a/src/bin/pg_rewind/filemap.c
+++ b/src/bin/pg_rewind/filemap.c
@@ -3,6 +3,19 @@
* filemap.c
* A data structure for keeping track of files that have changed.
*
+ * This source file contains the logic to decide what to do with different
+ * kinds of files, and the data structure to support it. Before modifying
+ * anything, pg_rewind collects information about all the files and their
+ * attributes in the target and source data directories. It also scans the
+ * WAL log in the target, and collects information about data blocks that
+ * were changed. All this information is stored in a hash table, using the
+ * file path, relative to the root of the data directory, as the key.
+ *
+ * After collecting all the information required, the decide_file_actions()
+ * function scans the hash table and decides what action needs to be taken
+ * for each file. Finally, it sorts the array to the final order that the
+ * actions should be executed in.
+ *
* Copyright (c) 2013-2020, PostgreSQL Global Development Group
*
*-------------------------------------------------------------------------
@@ -14,22 +27,41 @@
#include <unistd.h>
#include "catalog/pg_tablespace_d.h"
+#include "common/hashfn.h"
#include "common/string.h"
#include "datapagemap.h"
#include "filemap.h"
#include "pg_rewind.h"
#include "storage/fd.h"
-filemap_t *filemap = NULL;
+/*
+ * Define a hash table which we can use to store information about the files
+ * mentioned in the backup manifest.
+ */
+static uint32 hash_string_pointer(const char *s);
+#define SH_PREFIX filehash
+#define SH_ELEMENT_TYPE file_entry_t
+#define SH_KEY_TYPE const char *
+#define SH_KEY path
+#define SH_HASH_KEY(tb, key) hash_string_pointer(key)
+#define SH_EQUAL(tb, a, b) (strcmp(a, b) == 0)
+#define SH_SCOPE static inline
+#define SH_RAW_ALLOCATOR pg_malloc0
+#define SH_DECLARE
+#define SH_DEFINE
+#include "lib/simplehash.h"
+
+#define FILEMAP_INITIAL_SIZE 1000
+
+static filehash_hash *filehash;
static bool isRelDataFile(const char *path);
static char *datasegpath(RelFileNode rnode, ForkNumber forknum,
BlockNumber segno);
-static int path_cmp(const void *a, const void *b);
-static file_entry_t *get_filemap_entry(const char *path, bool create);
+static file_entry_t *insert_filehash_entry(const char *path);
+static file_entry_t *lookup_filehash_entry(const char *path);
static int final_filemap_cmp(const void *a, const void *b);
-static void filemap_list_to_array(filemap_t *map);
static bool check_file_excluded(const char *path, bool is_source);
/*
@@ -131,54 +163,26 @@ static const struct exclude_list_item excludeFiles[] =
};
/*
- * Create a new file map (stored in the global pointer "filemap").
+ * Initialize the hash table for the file map.
*/
void
-filemap_create(void)
+filehash_init(void)
{
- filemap_t *map;
-
- map = pg_malloc(sizeof(filemap_t));
- map->first = map->last = NULL;
- map->nlist = 0;
- map->array = NULL;
- map->narray = 0;
-
- Assert(filemap == NULL);
- filemap = map;
+ filehash = filehash_create(FILEMAP_INITIAL_SIZE, NULL);
}
-/* Look up or create entry for 'path' */
+/* Look up entry for 'path', creating new one if it doesn't exists */
static file_entry_t *
-get_filemap_entry(const char *path, bool create)
+insert_filehash_entry(const char *path)
{
- filemap_t *map = filemap;
file_entry_t *entry;
- file_entry_t **e;
- file_entry_t key;
- file_entry_t *key_ptr;
-
- if (map->array)
- {
- key.path = (char *) path;
- key_ptr = &key;
- e = bsearch(&key_ptr, map->array, map->narray, sizeof(file_entry_t *),
- path_cmp);
- }
- else
- e = NULL;
+ bool found;
- if (e)
- entry = *e;
- else if (!create)
- entry = NULL;
- else
+ entry = filehash_insert(filehash, path, &found);
+ if (!found)
{
- /* Create a new entry for this file */
- entry = pg_malloc(sizeof(file_entry_t));
entry->path = pg_strdup(path);
entry->isrelfile = isRelDataFile(path);
- entry->action = FILE_ACTION_UNDECIDED;
entry->target_exists = false;
entry->target_type = FILE_TYPE_UNDEFINED;
@@ -192,21 +196,18 @@ get_filemap_entry(const char *path, bool create)
entry->source_size = 0;
entry->source_link_target = NULL;
- entry->next = NULL;
-
- if (map->last)
- {
- map->last->next = entry;
- map->last = entry;
- }
- else
- map->first = map->last = entry;
- map->nlist++;
+ entry->action = FILE_ACTION_UNDECIDED;
}
return entry;
}
+static file_entry_t *
+lookup_filehash_entry(const char *path)
+{
+ return filehash_lookup(filehash, path);
+}
+
/*
* Callback for processing source file list.
*
@@ -220,8 +221,6 @@ process_source_file(const char *path, file_type_t type, size_t size,
{
file_entry_t *entry;
- Assert(filemap->array == NULL);
-
/*
* Pretend that pg_wal is a directory, even if it's really a symlink. We
* don't want to mess with the symlink itself, nor complain if it's a
@@ -238,7 +237,9 @@ process_source_file(const char *path, file_type_t type, size_t size,
pg_fatal("data file \"%s\" in source is not a regular file", path);
/* Remember this source file */
- entry = get_filemap_entry(path, true);
+ entry = insert_filehash_entry(path);
+ if (entry->source_exists)
+ pg_fatal("duplicate source file \"%s\"", path);
entry->source_exists = true;
entry->source_type = type;
entry->source_size = size;
@@ -256,7 +257,6 @@ void
process_target_file(const char *path, file_type_t type, size_t size,
const char *link_target)
{
- filemap_t *map = filemap;
file_entry_t *entry;
/*
@@ -264,21 +264,6 @@ process_target_file(const char *path, file_type_t type, size_t size,
* from the target data folder all paths which have been filtered out from
* the source data folder when processing the source files.
*/
- if (map->array == NULL)
- {
- /* on first call, initialize lookup array */
- if (map->nlist == 0)
- {
- /* should not happen */
- pg_fatal("source file list is empty");
- }
-
- filemap_list_to_array(map);
-
- Assert(map->array != NULL);
-
- qsort(map->array, map->narray, sizeof(file_entry_t *), path_cmp);
- }
/*
* Like in process_source_file, pretend that pg_wal is always a directory.
@@ -287,7 +272,9 @@ process_target_file(const char *path, file_type_t type, size_t size,
type = FILE_TYPE_DIRECTORY;
/* Remember this target file */
- entry = get_filemap_entry(path, true);
+ entry = insert_filehash_entry(path);
+ if (entry->target_exists)
+ pg_fatal("duplicate source file \"%s\"", path);
entry->target_exists = true;
entry->target_type = type;
entry->target_size = size;
@@ -301,7 +288,7 @@ process_target_file(const char *path, file_type_t type, size_t size,
* if so, records it in 'target_pages_to_overwrite' bitmap.
*
* NOTE: All the files on both systems must have already been added to the
- * file map!
+ * hash table!
*/
void
process_target_wal_block_change(ForkNumber forknum, RelFileNode rnode,
@@ -312,47 +299,45 @@ process_target_wal_block_change(ForkNumber forknum, RelFileNode rnode,
BlockNumber blkno_inseg;
int segno;
- Assert(filemap->array);
-
segno = blkno / RELSEG_SIZE;
blkno_inseg = blkno % RELSEG_SIZE;
path = datasegpath(rnode, forknum, segno);
- entry = get_filemap_entry(path, false);
+ entry = lookup_filehash_entry(path);
pfree(path);
+ /*
+ * If the block still exists in both systems, remember it. Otherwise we
+ * can safely ignore it.
+ *
+ * If the block is beyond the EOF in the source system, or the file
+ * doesn't exist in the source at all, we're going to truncate/remove it
+ * away from the target anyway. Likewise, if it doesn't exist in the
+ * target anymore, we will copy it over with the "tail" from the source
+ * system, anyway.
+ *
+ * It is possible to find WAL for a file that doesn't exist on either
+ * system anymore. It means that the relation was dropped later in the
+ * target system, and independently on the source system too, or that
+ * it was created and dropped in the target system and it never existed
+ * in the source. Either way, we can safely ignore it.
+ */
if (entry)
{
- int64 end_offset;
-
Assert(entry->isrelfile);
if (entry->target_type != FILE_TYPE_REGULAR)
pg_fatal("unexpected page modification for non-regular file \"%s\"",
entry->path);
- /*
- * If the block beyond the EOF in the source system, no need to
- * remember it now, because we're going to truncate it away from the
- * target anyway. Also no need to remember the block if it's beyond
- * the current EOF in the target system; we will copy it over with the
- * "tail" from the source system, anyway.
- */
- end_offset = (blkno_inseg + 1) * BLCKSZ;
- if (end_offset <= entry->source_size &&
- end_offset <= entry->target_size)
- datapagemap_add(&entry->target_pages_to_overwrite, blkno_inseg);
- }
- else
- {
- /*
- * If we don't have any record of this file in the file map, it means
- * that it's a relation that doesn't exist in the source system. It
- * could exist in the target system; we haven't moved the target-only
- * entries from the linked list to the array yet! But in any case, if
- * it doesn't exist in the source it will be removed from the target
- * too, and we can safely ignore it.
- */
+ if (entry->target_exists && entry->source_exists)
+ {
+ off_t end_offset;
+
+ end_offset = (blkno_inseg + 1) * BLCKSZ;
+ if (end_offset <= entry->source_size && end_offset <= entry->target_size)
+ datapagemap_add(&entry->target_pages_to_overwrite, blkno_inseg);
+ }
}
}
@@ -423,34 +408,6 @@ check_file_excluded(const char *path, bool is_source)
return false;
}
-/*
- * Convert the linked list of entries in map->first/last to the array,
- * map->array.
- */
-static void
-filemap_list_to_array(filemap_t *map)
-{
- int narray;
- file_entry_t *entry,
- *next;
-
- map->array = (file_entry_t **)
- pg_realloc(map->array,
- (map->nlist + map->narray) * sizeof(file_entry_t *));
-
- narray = map->narray;
- for (entry = map->first; entry != NULL; entry = next)
- {
- map->array[narray++] = entry;
- next = entry->next;
- entry->next = NULL;
- }
- Assert(narray == map->nlist + map->narray);
- map->narray = narray;
- map->nlist = 0;
- map->first = map->last = NULL;
-}
-
static const char *
action_to_str(file_action_t action)
{
@@ -478,32 +435,31 @@ action_to_str(file_action_t action)
* Calculate the totals needed for progress reports.
*/
void
-calculate_totals(void)
+calculate_totals(filemap_t *filemap)
{
file_entry_t *entry;
int i;
- filemap_t *map = filemap;
- map->total_size = 0;
- map->fetch_size = 0;
+ filemap->total_size = 0;
+ filemap->fetch_size = 0;
- for (i = 0; i < map->narray; i++)
+ for (i = 0; i < filemap->nentries; i++)
{
- entry = map->array[i];
+ entry = filemap->entries[i];
if (entry->source_type != FILE_TYPE_REGULAR)
continue;
- map->total_size += entry->source_size;
+ filemap->total_size += entry->source_size;
if (entry->action == FILE_ACTION_COPY)
{
- map->fetch_size += entry->source_size;
+ filemap->fetch_size += entry->source_size;
continue;
}
if (entry->action == FILE_ACTION_COPY_TAIL)
- map->fetch_size += (entry->source_size - entry->target_size);
+ filemap->fetch_size += (entry->source_size - entry->target_size);
if (entry->target_pages_to_overwrite.bitmapsize > 0)
{
@@ -512,7 +468,7 @@ calculate_totals(void)
iter = datapagemap_iterate(&entry->target_pages_to_overwrite);
while (datapagemap_next(iter, &blk))
- map->fetch_size += BLCKSZ;
+ filemap->fetch_size += BLCKSZ;
pg_free(iter);
}
@@ -520,15 +476,14 @@ calculate_totals(void)
}
void
-print_filemap(void)
+print_filemap(filemap_t *filemap)
{
- filemap_t *map = filemap;
file_entry_t *entry;
int i;
- for (i = 0; i < map->narray; i++)
+ for (i = 0; i < filemap->nentries; i++)
{
- entry = map->array[i];
+ entry = filemap->entries[i];
if (entry->action != FILE_ACTION_NONE ||
entry->target_pages_to_overwrite.bitmapsize > 0)
{
@@ -650,15 +605,6 @@ datasegpath(RelFileNode rnode, ForkNumber forknum, BlockNumber segno)
return path;
}
-static int
-path_cmp(const void *a, const void *b)
-{
- file_entry_t *fa = *((file_entry_t **) a);
- file_entry_t *fb = *((file_entry_t **) b);
-
- return strcmp(fa->path, fb->path);
-}
-
/*
* In the final stage, the filemap is sorted so that removals come last.
* From disk space usage point of view, it would be better to do removals
@@ -833,22 +779,52 @@ decide_file_action(file_entry_t *entry)
/*
* Decide what to do with each file.
+ *
+ * Returns a 'filemap' with the entries in the order that their actions
+ * should be executed.
*/
-void
+filemap_t *
decide_file_actions()
{
int i;
+ filehash_iterator it;
+ file_entry_t *entry;
+ filemap_t *filemap;
- filemap_list_to_array(filemap);
-
- for (i = 0; i < filemap->narray; i++)
+ filehash_start_iterate(filehash, &it);
+ while ((entry = filehash_iterate(filehash, &it)) != NULL)
{
- file_entry_t *entry = filemap->array[i];
-
entry->action = decide_file_action(entry);
}
- /* Sort the actions to the order that they should be performed */
- qsort(filemap->array, filemap->narray, sizeof(file_entry_t *),
+ /*
+ * Turn the hash table into an array, and sort in the order that the
+ * actions should be performed.
+ */
+ filemap = pg_malloc(offsetof(filemap_t, entries) +
+ filehash->members * sizeof(file_entry_t *));
+ filemap->nentries = filehash->members;
+ filehash_start_iterate(filehash, &it);
+ i = 0;
+ while ((entry = filehash_iterate(filehash, &it)) != NULL)
+ {
+ filemap->entries[i++] = entry;
+ }
+
+ qsort(&filemap->entries, filemap->nentries, sizeof(file_entry_t *),
final_filemap_cmp);
+
+ return filemap;
+}
+
+
+/*
+ * Helper function for filemap hash table.
+ */
+static uint32
+hash_string_pointer(const char *s)
+{
+ unsigned char *ss = (unsigned char *) s;
+
+ return hash_bytes(ss, strlen(s));
}
diff --git a/src/bin/pg_rewind/filemap.h b/src/bin/pg_rewind/filemap.h
index 73c687e4e34..93e13de26ba 100644
--- a/src/bin/pg_rewind/filemap.h
+++ b/src/bin/pg_rewind/filemap.h
@@ -12,15 +12,6 @@
#include "storage/block.h"
#include "storage/relfilenode.h"
-/*
- * For every file found in the local or remote system, we have a file entry
- * that contains information about the file on both systems. For relation
- * files, there is also a page map that marks pages in the file that were
- * changed in the target after the last common checkpoint. Each entry also
- * contains an 'action' field, which says what we are going to do with the
- * file.
- */
-
/* these enum values are sorted in the order we want actions to be processed */
typedef enum
{
@@ -44,9 +35,21 @@ typedef enum
FILE_TYPE_SYMLINK
} file_type_t;
+/*
+ * For every file found in the local or remote system, we have a file entry
+ * that contains information about the file on both systems. For relation
+ * files, there is also a page map that marks pages in the file that were
+ * changed in the target after the last common checkpoint.
+ *
+ * When gathering information, these are kept in a hash table, private to
+ * filemap.c. filemap_finalize() fills in the 'action' field, sorts all the
+ * entries, and returns them in an array, ready for executing the actions.
+ */
typedef struct file_entry_t
{
- char *path;
+ uint32 status; /* hash status */
+
+ const char *path;
bool isrelfile; /* is it a relation data file? */
/*
@@ -75,44 +78,25 @@ typedef struct file_entry_t
* What will we do to the file?
*/
file_action_t action;
-
- struct file_entry_t *next;
} file_entry_t;
+/*
+ * This contains the final decisions on what to do with each file.
+ * 'entries' array contains an entry for each file, sorted in the order
+ * that their actions should executed.
+ */
typedef struct filemap_t
{
- /*
- * New entries are accumulated to a linked list, in process_source_file
- * and process_target_file.
- */
- file_entry_t *first;
- file_entry_t *last;
- int nlist; /* number of entries currently in list */
-
- /*
- * After processing all the remote files, the entries in the linked list
- * are moved to this array. After processing local files, too, all the
- * local entries are added to the array by filemap_finalize, and sorted in
- * the final order. After filemap_finalize, all the entries are in the
- * array, and the linked list is empty.
- */
- file_entry_t **array;
- int narray; /* current length of array */
-
- /*
- * Summary information.
- */
+ /* Summary information, filled by calculate_totals() */
uint64 total_size; /* total size of the source cluster */
uint64 fetch_size; /* number of bytes that needs to be copied */
-} filemap_t;
-extern filemap_t *filemap;
-
-extern void filemap_create(void);
-extern void calculate_totals(void);
-extern void print_filemap(void);
+ int nentries; /* size of 'entries' array */
+ file_entry_t *entries[FLEXIBLE_ARRAY_MEMBER];
+} filemap_t;
/* Functions for populating the filemap */
+extern void filehash_init(void);
extern void process_source_file(const char *path, file_type_t type,
size_t size, const char *link_target);
extern void process_target_file(const char *path, file_type_t type,
@@ -120,6 +104,9 @@ extern void process_target_file(const char *path, file_type_t type,
extern void process_target_wal_block_change(ForkNumber forknum,
RelFileNode rnode,
BlockNumber blkno);
-extern void decide_file_actions(void);
+
+extern filemap_t *decide_file_actions(void);
+extern void calculate_totals(filemap_t *filemap);
+extern void print_filemap(filemap_t *filemap);
#endif /* FILEMAP_H */
diff --git a/src/bin/pg_rewind/libpq_fetch.c b/src/bin/pg_rewind/libpq_fetch.c
index 2fc4a784bdb..16d451ae167 100644
--- a/src/bin/pg_rewind/libpq_fetch.c
+++ b/src/bin/pg_rewind/libpq_fetch.c
@@ -460,9 +460,9 @@ libpq_executeFileMap(filemap_t *map)
PQresultErrorMessage(res));
PQclear(res);
- for (i = 0; i < map->narray; i++)
+ for (i = 0; i < map->nentries; i++)
{
- entry = map->array[i];
+ entry = map->entries[i];
/* If this is a relation file, copy the modified blocks */
execute_pagemap(&entry->target_pages_to_overwrite, entry->path);
diff --git a/src/bin/pg_rewind/pg_rewind.c b/src/bin/pg_rewind/pg_rewind.c
index 4760090d06e..574d7f7163b 100644
--- a/src/bin/pg_rewind/pg_rewind.c
+++ b/src/bin/pg_rewind/pg_rewind.c
@@ -129,6 +129,7 @@ main(int argc, char **argv)
TimeLineID endtli;
ControlFileData ControlFile_new;
bool writerecoveryconf = false;
+ filemap_t *filemap;
pg_logging_init(argv[0]);
set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_rewind"));
@@ -368,13 +369,16 @@ main(int argc, char **argv)
(uint32) (chkptrec >> 32), (uint32) chkptrec,
chkpttli);
+ /* Initialize the hash table to track the status of each file */
+ filehash_init();
+
/*
* Collect information about all files in the target and source systems.
*/
- filemap_create();
if (showprogress)
pg_log_info("reading source file list");
fetchSourceFileList();
+
if (showprogress)
pg_log_info("reading target file list");
traverse_datadir(datadir_target, &process_target_file);
@@ -395,13 +399,13 @@ main(int argc, char **argv)
* We have collected all information we need from both systems. Decide
* what to do with each file.
*/
- decide_file_actions();
+ filemap = decide_file_actions();
if (showprogress)
- calculate_totals();
+ calculate_totals(filemap);
/* this is too verbose even for verbose mode */
if (debug)
- print_filemap();
+ print_filemap(filemap);
/*
* Ok, we're ready to start copying things over.
@@ -421,7 +425,7 @@ main(int argc, char **argv)
* modified the target directory and there is no turning back!
*/
- executeFileMap();
+ execute_file_actions(filemap);
progress_report(true);
--
2.20.1
>From 1bf7619917636d690df57f825628799ac55fa19a Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakan...@iki.fi>
Date: Thu, 24 Sep 2020 19:51:35 +0300
Subject: [PATCH v3 4/5] pg_rewind: Refactor the abstraction to fetch from
local/libpq source.
There copy_executeFileMap() and libpq_executeFileMap() contained basically
the same logic, just calling different functions to fetch the source files.
Refactor so that the common logic is in one place, execute_file_actions().
This makes the abstraction of a "source" server more clear, by introducing
a common abstract class, borrowing the object-oriented programming term,
that represents all the operations that can be done on the source server.
There are two implementations of it, one for fetching via libpq, and
another to fetch from a local directory. This adds some code, but makes it
easier to understand what's going on.
Reviewed-by: Kyotaro Horiguchi, Soumyadeep Chakraborty
Discussion: https://www.postgresql.org/message-id/0c5b3783-af52-3ee5-f8fa-6e794061f70d%40iki.fi
---
src/bin/pg_rewind/Makefile | 5 +-
src/bin/pg_rewind/fetch.c | 60 ---
src/bin/pg_rewind/fetch.h | 44 --
src/bin/pg_rewind/file_ops.c | 133 +++++-
src/bin/pg_rewind/file_ops.h | 3 +
.../{libpq_fetch.c => libpq_source.c} | 395 +++++++++---------
src/bin/pg_rewind/local_source.c | 134 ++++++
src/bin/pg_rewind/pg_rewind.c | 201 +++++++--
src/bin/pg_rewind/pg_rewind.h | 5 -
src/bin/pg_rewind/rewind_source.h | 79 ++++
10 files changed, 697 insertions(+), 362 deletions(-)
delete mode 100644 src/bin/pg_rewind/fetch.c
delete mode 100644 src/bin/pg_rewind/fetch.h
rename src/bin/pg_rewind/{libpq_fetch.c => libpq_source.c} (65%)
create mode 100644 src/bin/pg_rewind/local_source.c
create mode 100644 src/bin/pg_rewind/rewind_source.h
diff --git a/src/bin/pg_rewind/Makefile b/src/bin/pg_rewind/Makefile
index f398c3d8488..9bfde5c087b 100644
--- a/src/bin/pg_rewind/Makefile
+++ b/src/bin/pg_rewind/Makefile
@@ -20,12 +20,11 @@ LDFLAGS_INTERNAL += -L$(top_builddir)/src/fe_utils -lpgfeutils $(libpq_pgport)
OBJS = \
$(WIN32RES) \
- copy_fetch.o \
datapagemap.o \
- fetch.o \
file_ops.o \
filemap.o \
- libpq_fetch.o \
+ libpq_source.o \
+ local_source.o \
parsexlog.o \
pg_rewind.o \
timeline.o \
diff --git a/src/bin/pg_rewind/fetch.c b/src/bin/pg_rewind/fetch.c
deleted file mode 100644
index f41d0f295ea..00000000000
--- a/src/bin/pg_rewind/fetch.c
+++ /dev/null
@@ -1,60 +0,0 @@
-/*-------------------------------------------------------------------------
- *
- * fetch.c
- * Functions for fetching files from a local or remote data dir
- *
- * This file forms an abstraction of getting files from the "source".
- * There are two implementations of this interface: one for copying files
- * from a data directory via normal filesystem operations (copy_fetch.c),
- * and another for fetching files from a remote server via a libpq
- * connection (libpq_fetch.c)
- *
- *
- * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
- *
- *-------------------------------------------------------------------------
- */
-#include "postgres_fe.h"
-
-#include <sys/stat.h>
-#include <unistd.h>
-
-#include "fetch.h"
-#include "file_ops.h"
-#include "filemap.h"
-#include "pg_rewind.h"
-
-void
-fetchSourceFileList(void)
-{
- if (datadir_source)
- traverse_datadir(datadir_source, &process_source_file);
- else
- libpqProcessFileList();
-}
-
-/*
- * Fetch all relation data files that are marked in the given data page map.
- */
-void
-execute_file_actions(filemap_t *filemap)
-{
- if (datadir_source)
- copy_executeFileMap(filemap);
- else
- libpq_executeFileMap(filemap);
-}
-
-/*
- * Fetch a single file into a malloc'd buffer. The file size is returned
- * in *filesize. The returned buffer is always zero-terminated, which is
- * handy for text files.
- */
-char *
-fetchFile(const char *filename, size_t *filesize)
-{
- if (datadir_source)
- return slurpFile(datadir_source, filename, filesize);
- else
- return libpqGetFile(filename, filesize);
-}
diff --git a/src/bin/pg_rewind/fetch.h b/src/bin/pg_rewind/fetch.h
deleted file mode 100644
index b20df8b1537..00000000000
--- a/src/bin/pg_rewind/fetch.h
+++ /dev/null
@@ -1,44 +0,0 @@
-/*-------------------------------------------------------------------------
- *
- * fetch.h
- * Fetching data from a local or remote data directory.
- *
- * This file includes the prototypes for functions used to copy files from
- * one data directory to another. The source to copy from can be a local
- * directory (copy method), or a remote PostgreSQL server (libpq fetch
- * method).
- *
- * Copyright (c) 2013-2020, PostgreSQL Global Development Group
- *
- *-------------------------------------------------------------------------
- */
-#ifndef FETCH_H
-#define FETCH_H
-
-#include "access/xlogdefs.h"
-
-#include "filemap.h"
-
-/*
- * Common interface. Calls the copy or libpq method depending on global
- * config options.
- */
-extern void fetchSourceFileList(void);
-extern char *fetchFile(const char *filename, size_t *filesize);
-extern void execute_file_actions(filemap_t *filemap);
-
-/* in libpq_fetch.c */
-extern void libpqProcessFileList(void);
-extern char *libpqGetFile(const char *filename, size_t *filesize);
-extern void libpq_executeFileMap(filemap_t *map);
-
-extern void libpqConnect(const char *connstr);
-extern XLogRecPtr libpqGetCurrentXlogInsertLocation(void);
-
-/* in copy_fetch.c */
-extern void copy_executeFileMap(filemap_t *map);
-
-typedef void (*process_file_callback_t) (const char *path, file_type_t type, size_t size, const char *link_target);
-extern void traverse_datadir(const char *datadir, process_file_callback_t callback);
-
-#endif /* FETCH_H */
diff --git a/src/bin/pg_rewind/file_ops.c b/src/bin/pg_rewind/file_ops.c
index ec37d0b2e0d..065368a2208 100644
--- a/src/bin/pg_rewind/file_ops.c
+++ b/src/bin/pg_rewind/file_ops.c
@@ -15,6 +15,7 @@
#include "postgres_fe.h"
#include <sys/stat.h>
+#include <dirent.h>
#include <fcntl.h>
#include <unistd.h>
@@ -35,6 +36,9 @@ static void remove_target_dir(const char *path);
static void create_target_symlink(const char *path, const char *link);
static void remove_target_symlink(const char *path);
+static void recurse_dir(const char *datadir, const char *parentpath,
+ process_file_callback_t callback);
+
/*
* Open a target file for writing. If 'trunc' is true and the file already
* exists, it will be truncated.
@@ -83,7 +87,7 @@ close_target_file(void)
void
write_target_range(char *buf, off_t begin, size_t size)
{
- int writeleft;
+ size_t writeleft;
char *p;
/* update progress report */
@@ -101,7 +105,7 @@ write_target_range(char *buf, off_t begin, size_t size)
p = buf;
while (writeleft > 0)
{
- int writelen;
+ ssize_t writelen;
errno = 0;
writelen = write(dstfd, p, writeleft);
@@ -305,9 +309,6 @@ sync_target_dir(void)
* buffer is actually *filesize + 1. That's handy when reading a text file.
* This function can be used to read binary files as well, you can just
* ignore the zero-terminator in that case.
- *
- * This function is used to implement the fetchFile function in the "fetch"
- * interface (see fetch.c), but is also called directly.
*/
char *
slurpFile(const char *datadir, const char *path, size_t *filesize)
@@ -352,3 +353,125 @@ slurpFile(const char *datadir, const char *path, size_t *filesize)
*filesize = len;
return buffer;
}
+
+/*
+ * Traverse through all files in a data directory, calling 'callback'
+ * for each file.
+ */
+void
+traverse_datadir(const char *datadir, process_file_callback_t callback)
+{
+ recurse_dir(datadir, NULL, callback);
+}
+
+/*
+ * recursive part of traverse_datadir
+ *
+ * parentpath is the current subdirectory's path relative to datadir,
+ * or NULL at the top level.
+ */
+static void
+recurse_dir(const char *datadir, const char *parentpath,
+ process_file_callback_t callback)
+{
+ DIR *xldir;
+ struct dirent *xlde;
+ char fullparentpath[MAXPGPATH];
+
+ if (parentpath)
+ snprintf(fullparentpath, MAXPGPATH, "%s/%s", datadir, parentpath);
+ else
+ snprintf(fullparentpath, MAXPGPATH, "%s", datadir);
+
+ xldir = opendir(fullparentpath);
+ if (xldir == NULL)
+ pg_fatal("could not open directory \"%s\": %m",
+ fullparentpath);
+
+ while (errno = 0, (xlde = readdir(xldir)) != NULL)
+ {
+ struct stat fst;
+ char fullpath[MAXPGPATH * 2];
+ char path[MAXPGPATH * 2];
+
+ if (strcmp(xlde->d_name, ".") == 0 ||
+ strcmp(xlde->d_name, "..") == 0)
+ continue;
+
+ snprintf(fullpath, sizeof(fullpath), "%s/%s", fullparentpath, xlde->d_name);
+
+ if (lstat(fullpath, &fst) < 0)
+ {
+ if (errno == ENOENT)
+ {
+ /*
+ * File doesn't exist anymore. This is ok, if the new primary
+ * is running and the file was just removed. If it was a data
+ * file, there should be a WAL record of the removal. If it
+ * was something else, it couldn't have been anyway.
+ *
+ * TODO: But complain if we're processing the target dir!
+ */
+ }
+ else
+ pg_fatal("could not stat file \"%s\": %m",
+ fullpath);
+ }
+
+ if (parentpath)
+ snprintf(path, sizeof(path), "%s/%s", parentpath, xlde->d_name);
+ else
+ snprintf(path, sizeof(path), "%s", xlde->d_name);
+
+ if (S_ISREG(fst.st_mode))
+ callback(path, FILE_TYPE_REGULAR, fst.st_size, NULL);
+ else if (S_ISDIR(fst.st_mode))
+ {
+ callback(path, FILE_TYPE_DIRECTORY, 0, NULL);
+ /* recurse to handle subdirectories */
+ recurse_dir(datadir, path, callback);
+ }
+#ifndef WIN32
+ else if (S_ISLNK(fst.st_mode))
+#else
+ else if (pgwin32_is_junction(fullpath))
+#endif
+ {
+#if defined(HAVE_READLINK) || defined(WIN32)
+ char link_target[MAXPGPATH];
+ int len;
+
+ len = readlink(fullpath, link_target, sizeof(link_target));
+ if (len < 0)
+ pg_fatal("could not read symbolic link \"%s\": %m",
+ fullpath);
+ if (len >= sizeof(link_target))
+ pg_fatal("symbolic link \"%s\" target is too long",
+ fullpath);
+ link_target[len] = '\0';
+
+ callback(path, FILE_TYPE_SYMLINK, 0, link_target);
+
+ /*
+ * If it's a symlink within pg_tblspc, we need to recurse into it,
+ * to process all the tablespaces. We also follow a symlink if
+ * it's for pg_wal. Symlinks elsewhere are ignored.
+ */
+ if ((parentpath && strcmp(parentpath, "pg_tblspc") == 0) ||
+ strcmp(path, "pg_wal") == 0)
+ recurse_dir(datadir, path, callback);
+#else
+ pg_fatal("\"%s\" is a symbolic link, but symbolic links are not supported on this platform",
+ fullpath);
+#endif /* HAVE_READLINK */
+ }
+ }
+
+ if (errno)
+ pg_fatal("could not read directory \"%s\": %m",
+ fullparentpath);
+
+ if (closedir(xldir))
+ pg_fatal("could not close directory \"%s\": %m",
+ fullparentpath);
+}
diff --git a/src/bin/pg_rewind/file_ops.h b/src/bin/pg_rewind/file_ops.h
index d8466385cf5..c7630859768 100644
--- a/src/bin/pg_rewind/file_ops.h
+++ b/src/bin/pg_rewind/file_ops.h
@@ -23,4 +23,7 @@ extern void sync_target_dir(void);
extern char *slurpFile(const char *datadir, const char *path, size_t *filesize);
+typedef void (*process_file_callback_t) (const char *path, file_type_t type, size_t size, const char *link_target);
+extern void traverse_datadir(const char *datadir, process_file_callback_t callback);
+
#endif /* FILE_OPS_H */
diff --git a/src/bin/pg_rewind/libpq_fetch.c b/src/bin/pg_rewind/libpq_source.c
similarity index 65%
rename from src/bin/pg_rewind/libpq_fetch.c
rename to src/bin/pg_rewind/libpq_source.c
index 16d451ae167..30294d582ee 100644
--- a/src/bin/pg_rewind/libpq_fetch.c
+++ b/src/bin/pg_rewind/libpq_source.c
@@ -1,7 +1,7 @@
/*-------------------------------------------------------------------------
*
- * libpq_fetch.c
- * Functions for fetching files from a remote server.
+ * libpq_source.c
+ * Functions for fetching files from a remote server via libpq.
*
* Copyright (c) 2013-2020, PostgreSQL Global Development Group
*
@@ -9,21 +9,14 @@
*/
#include "postgres_fe.h"
-#include <sys/stat.h>
-#include <dirent.h>
-#include <fcntl.h>
-#include <unistd.h>
-
#include "catalog/pg_type_d.h"
#include "common/connect.h"
#include "datapagemap.h"
-#include "fetch.h"
#include "file_ops.h"
#include "filemap.h"
#include "pg_rewind.h"
#include "port/pg_bswap.h"
-
-PGconn *conn = NULL;
+#include "rewind_source.h"
/*
* Files are fetched max CHUNKSIZE bytes at a time.
@@ -34,30 +27,71 @@ PGconn *conn = NULL;
*/
#define CHUNKSIZE 1000000
-static void receiveFileChunks(const char *sql);
-static void execute_pagemap(datapagemap_t *pagemap, const char *path);
-static char *run_simple_query(const char *sql);
-static void run_simple_command(const char *sql);
+typedef struct
+{
+ rewind_source common; /* common interface functions */
+
+ PGconn *conn;
+ bool copy_started;
+} libpq_source;
+
+static void init_libpq_conn(PGconn *conn);
+static char *run_simple_query(PGconn *conn, const char *sql);
+static void run_simple_command(PGconn *conn, const char *sql);
+
+/* public interface functions */
+static void libpq_traverse_files(rewind_source *source,
+ process_file_callback_t callback);
+static void libpq_queue_fetch_range(rewind_source *source, const char *path,
+ off_t off, size_t len);
+static void libpq_finish_fetch(rewind_source *source);
+static char *libpq_fetch_file(rewind_source *source, const char *path,
+ size_t *filesize);
+static XLogRecPtr libpq_get_current_wal_insert_lsn(rewind_source *source);
+static void libpq_destroy(rewind_source *source);
-void
-libpqConnect(const char *connstr)
+/*
+ * Create a new libpq source.
+ *
+ * The caller has already established the connection, but should not try
+ * to use it while the source is active.
+ */
+rewind_source *
+init_libpq_source(PGconn *conn)
{
- char *str;
- PGresult *res;
+ libpq_source *src;
+
+ init_libpq_conn(conn);
+
+ src = pg_malloc0(sizeof(libpq_source));
+
+ src->common.traverse_files = libpq_traverse_files;
+ src->common.fetch_file = libpq_fetch_file;
+ src->common.queue_fetch_range = libpq_queue_fetch_range;
+ src->common.finish_fetch = libpq_finish_fetch;
+ src->common.get_current_wal_insert_lsn = libpq_get_current_wal_insert_lsn;
+ src->common.destroy = libpq_destroy;
- conn = PQconnectdb(connstr);
- if (PQstatus(conn) == CONNECTION_BAD)
- pg_fatal("could not connect to server: %s",
- PQerrorMessage(conn));
+ src->conn = conn;
- if (showprogress)
- pg_log_info("connected to server");
+ return &src->common;
+}
+
+/*
+ * Initialize a libpq connection for use.
+ */
+static void
+init_libpq_conn(PGconn *conn)
+{
+ PGresult *res;
+ char *str;
/* disable all types of timeouts */
- run_simple_command("SET statement_timeout = 0");
- run_simple_command("SET lock_timeout = 0");
- run_simple_command("SET idle_in_transaction_session_timeout = 0");
+ run_simple_command(conn, "SET statement_timeout = 0");
+ run_simple_command(conn, "SET lock_timeout = 0");
+ run_simple_command(conn, "SET idle_in_transaction_session_timeout = 0");
+ /* secure search_path */
res = PQexec(conn, ALWAYS_SECURE_SEARCH_PATH_SQL);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pg_fatal("could not clear search_path: %s",
@@ -70,7 +104,7 @@ libpqConnect(const char *connstr)
* currently because we use a temporary table. Better to check for it
* explicitly than error out, for a better error message.
*/
- str = run_simple_query("SELECT pg_is_in_recovery()");
+ str = run_simple_query(conn, "SELECT pg_is_in_recovery()");
if (strcmp(str, "f") != 0)
pg_fatal("source server must not be in recovery mode");
pg_free(str);
@@ -80,27 +114,19 @@ libpqConnect(const char *connstr)
* a page is modified while we read it with pg_read_binary_file(), and we
* rely on full page images to fix them.
*/
- str = run_simple_query("SHOW full_page_writes");
+ str = run_simple_query(conn, "SHOW full_page_writes");
if (strcmp(str, "on") != 0)
pg_fatal("full_page_writes must be enabled in the source server");
pg_free(str);
-
- /*
- * Although we don't do any "real" updates, we do work with a temporary
- * table. We don't care about synchronous commit for that. It doesn't
- * otherwise matter much, but if the server is using synchronous
- * replication, and replication isn't working for some reason, we don't
- * want to get stuck, waiting for it to start working again.
- */
- run_simple_command("SET synchronous_commit = off");
}
/*
- * Runs a query that returns a single value.
+ * Run a query that returns a single value.
+ *
* The result should be pg_free'd after use.
*/
static char *
-run_simple_query(const char *sql)
+run_simple_query(PGconn *conn, const char *sql)
{
PGresult *res;
char *result;
@@ -123,11 +149,12 @@ run_simple_query(const char *sql)
}
/*
- * Runs a command.
+ * Run a command.
+ *
* In the event of a failure, exit immediately.
*/
static void
-run_simple_command(const char *sql)
+run_simple_command(PGconn *conn, const char *sql)
{
PGresult *res;
@@ -141,17 +168,18 @@ run_simple_command(const char *sql)
}
/*
- * Calls pg_current_wal_insert_lsn() function
+ * Call the pg_current_wal_insert_lsn() function in the remote system.
*/
-XLogRecPtr
-libpqGetCurrentXlogInsertLocation(void)
+static XLogRecPtr
+libpq_get_current_wal_insert_lsn(rewind_source *source)
{
+ PGconn *conn = ((libpq_source *) source)->conn;
XLogRecPtr result;
uint32 hi;
uint32 lo;
char *val;
- val = run_simple_query("SELECT pg_current_wal_insert_lsn()");
+ val = run_simple_query(conn, "SELECT pg_current_wal_insert_lsn()");
if (sscanf(val, "%X/%X", &hi, &lo) != 2)
pg_fatal("unrecognized result \"%s\" for current WAL insert location", val);
@@ -166,9 +194,10 @@ libpqGetCurrentXlogInsertLocation(void)
/*
* Get a list of all files in the data directory.
*/
-void
-libpqProcessFileList(void)
+static void
+libpq_traverse_files(rewind_source *source, process_file_callback_t callback)
{
+ PGconn *conn = ((libpq_source *) source)->conn;
PGresult *res;
const char *sql;
int i;
@@ -246,30 +275,114 @@ libpqProcessFileList(void)
PQclear(res);
}
-/*----
- * Runs a query, which returns pieces of files from the remote source data
- * directory, and overwrites the corresponding parts of target files with
- * the received parts. The result set is expected to be of format:
- *
- * path text -- path in the data directory, e.g "base/1/123"
- * begin int8 -- offset within the file
- * chunk bytea -- file content
- *----
+/*
+ * Queue up a request to fetch a piece of a file from remote system.
*/
static void
-receiveFileChunks(const char *sql)
+libpq_queue_fetch_range(rewind_source *source, const char *path, off_t off,
+ size_t len)
{
+ libpq_source *src = (libpq_source *) source;
+ uint64 begin = off;
+ uint64 end = off + len;
+
+ /*
+ * On first call, create a temporary table, and start COPYing to it.
+ * We will load it with the list of blocks that we need to fetch.
+ */
+ if (!src->copy_started)
+ {
+ PGresult *res;
+
+ run_simple_command(src->conn, "CREATE TEMPORARY TABLE fetchchunks(path text, begin int8, len int4)");
+
+ res = PQexec(src->conn, "COPY fetchchunks FROM STDIN");
+ if (PQresultStatus(res) != PGRES_COPY_IN)
+ pg_fatal("could not send file list: %s",
+ PQresultErrorMessage(res));
+ PQclear(res);
+
+ src->copy_started = true;
+ }
+
+ /*
+ * Write the file range to a temporary table in the server.
+ *
+ * The range is sent to the server as a COPY formatted line, to be inserted
+ * into the 'fetchchunks' temporary table. The libpq_finish_fetch() uses
+ * the temporary table to actually fetch the data.
+ */
+
+ /* Split the range into CHUNKSIZE chunks */
+ while (end - begin > 0)
+ {
+ char linebuf[MAXPGPATH + 23];
+ unsigned int len;
+
+ /* Fine as long as CHUNKSIZE is not bigger than UINT32_MAX */
+ if (end - begin > CHUNKSIZE)
+ len = CHUNKSIZE;
+ else
+ len = (unsigned int) (end - begin);
+
+ snprintf(linebuf, sizeof(linebuf), "%s\t" UINT64_FORMAT "\t%u\n", path, begin, len);
+
+ if (PQputCopyData(src->conn, linebuf, strlen(linebuf)) != 1)
+ pg_fatal("could not send COPY data: %s",
+ PQerrorMessage(src->conn));
+
+ begin += len;
+ }
+}
+
+/*
+ * Receive all the queued chunks and write them to the target data directory.
+ */
+static void
+libpq_finish_fetch(rewind_source *source)
+{
+ libpq_source *src = (libpq_source *) source;
PGresult *res;
+ const char *sql;
- if (PQsendQueryParams(conn, sql, 0, NULL, NULL, NULL, NULL, 1) != 1)
- pg_fatal("could not send query: %s", PQerrorMessage(conn));
+ if (PQputCopyEnd(src->conn, NULL) != 1)
+ pg_fatal("could not send end-of-COPY: %s",
+ PQerrorMessage(src->conn));
+
+ while ((res = PQgetResult(src->conn)) != NULL)
+ {
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pg_fatal("unexpected result while sending file list: %s",
+ PQresultErrorMessage(res));
+ PQclear(res);
+ }
+
+ /*
+ * We've now copied the list of file ranges that we need to fetch to the
+ * temporary table. Now, actually fetch all of those ranges.
+ */
+ sql =
+ "SELECT path, begin,\n"
+ " pg_read_binary_file(path, begin, len, true) AS chunk\n"
+ "FROM fetchchunks\n";
+
+ if (PQsendQueryParams(src->conn, sql, 0, NULL, NULL, NULL, NULL, 1) != 1)
+ pg_fatal("could not send query: %s", PQerrorMessage(src->conn));
pg_log_debug("getting file chunks");
- if (PQsetSingleRowMode(conn) != 1)
+ if (PQsetSingleRowMode(src->conn) != 1)
pg_fatal("could not set libpq connection to single row mode");
- while ((res = PQgetResult(conn)) != NULL)
+ /*----
+ * The result set is of format:
+ *
+ * path text -- path in the data directory, e.g "base/1/123"
+ * begin int8 -- offset within the file
+ * chunk bytea -- file content
+ *----
+ */
+ while ((res = PQgetResult(src->conn)) != NULL)
{
char *filename;
int filenamelen;
@@ -349,8 +462,8 @@ receiveFileChunks(const char *sql)
continue;
}
- pg_log_debug("received chunk for file \"%s\", offset %lld, size %d",
- filename, (long long int) chunkoff, chunksize);
+ pg_log_debug("received chunk for file \"%s\", offset " INT64_FORMAT ", size %d",
+ filename, chunkoff, chunksize);
open_target_file(filename, false);
@@ -363,28 +476,29 @@ receiveFileChunks(const char *sql)
}
/*
- * Receive a single file as a malloc'd buffer.
+ * Fetch a single file as a malloc'd buffer.
*/
-char *
-libpqGetFile(const char *filename, size_t *filesize)
+static char *
+libpq_fetch_file(rewind_source *source, const char *path, size_t *filesize)
{
+ PGconn *conn = ((libpq_source *) source)->conn;
PGresult *res;
char *result;
int len;
const char *paramValues[1];
- paramValues[0] = filename;
+ paramValues[0] = path;
res = PQexecParams(conn, "SELECT pg_read_binary_file($1)",
1, NULL, paramValues, NULL, NULL, 1);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pg_fatal("could not fetch remote file \"%s\": %s",
- filename, PQresultErrorMessage(res));
+ path, PQresultErrorMessage(res));
/* sanity check the result set */
if (PQntuples(res) != 1 || PQgetisnull(res, 0, 0))
pg_fatal("unexpected result set while fetching remote file \"%s\"",
- filename);
+ path);
/* Read result to local variables */
len = PQgetlength(res, 0, 0);
@@ -394,7 +508,7 @@ libpqGetFile(const char *filename, size_t *filesize)
PQclear(res);
- pg_log_debug("fetched file \"%s\", length %d", filename, len);
+ pg_log_debug("fetched file \"%s\", length %d", path, len);
if (filesize)
*filesize = len;
@@ -402,142 +516,11 @@ libpqGetFile(const char *filename, size_t *filesize)
}
/*
- * Write a file range to a temporary table in the server.
- *
- * The range is sent to the server as a COPY formatted line, to be inserted
- * into the 'fetchchunks' temporary table. It is used in receiveFileChunks()
- * function to actually fetch the data.
- */
-static void
-fetch_file_range(const char *path, uint64 begin, uint64 end)
-{
- char linebuf[MAXPGPATH + 23];
-
- /* Split the range into CHUNKSIZE chunks */
- while (end - begin > 0)
- {
- unsigned int len;
-
- /* Fine as long as CHUNKSIZE is not bigger than UINT32_MAX */
- if (end - begin > CHUNKSIZE)
- len = CHUNKSIZE;
- else
- len = (unsigned int) (end - begin);
-
- snprintf(linebuf, sizeof(linebuf), "%s\t" UINT64_FORMAT "\t%u\n", path, begin, len);
-
- if (PQputCopyData(conn, linebuf, strlen(linebuf)) != 1)
- pg_fatal("could not send COPY data: %s",
- PQerrorMessage(conn));
-
- begin += len;
- }
-}
-
-/*
- * Fetch all changed blocks from remote source data directory.
+ * Close a libpq source.
*/
-void
-libpq_executeFileMap(filemap_t *map)
-{
- file_entry_t *entry;
- const char *sql;
- PGresult *res;
- int i;
-
- /*
- * First create a temporary table, and load it with the blocks that we
- * need to fetch.
- */
- sql = "CREATE TEMPORARY TABLE fetchchunks(path text, begin int8, len int4);";
- run_simple_command(sql);
-
- sql = "COPY fetchchunks FROM STDIN";
- res = PQexec(conn, sql);
-
- if (PQresultStatus(res) != PGRES_COPY_IN)
- pg_fatal("could not send file list: %s",
- PQresultErrorMessage(res));
- PQclear(res);
-
- for (i = 0; i < map->nentries; i++)
- {
- entry = map->entries[i];
-
- /* If this is a relation file, copy the modified blocks */
- execute_pagemap(&entry->target_pages_to_overwrite, entry->path);
-
- switch (entry->action)
- {
- case FILE_ACTION_NONE:
- /* nothing else to do */
- break;
-
- case FILE_ACTION_COPY:
- /* Truncate the old file out of the way, if any */
- open_target_file(entry->path, true);
- fetch_file_range(entry->path, 0, entry->source_size);
- break;
-
- case FILE_ACTION_TRUNCATE:
- truncate_target_file(entry->path, entry->source_size);
- break;
-
- case FILE_ACTION_COPY_TAIL:
- fetch_file_range(entry->path, entry->target_size, entry->source_size);
- break;
-
- case FILE_ACTION_REMOVE:
- remove_target(entry);
- break;
-
- case FILE_ACTION_CREATE:
- create_target(entry);
- break;
-
- case FILE_ACTION_UNDECIDED:
- pg_fatal("no action decided for \"%s\"", entry->path);
- break;
- }
- }
-
- if (PQputCopyEnd(conn, NULL) != 1)
- pg_fatal("could not send end-of-COPY: %s",
- PQerrorMessage(conn));
-
- while ((res = PQgetResult(conn)) != NULL)
- {
- if (PQresultStatus(res) != PGRES_COMMAND_OK)
- pg_fatal("unexpected result while sending file list: %s",
- PQresultErrorMessage(res));
- PQclear(res);
- }
-
- /*
- * We've now copied the list of file ranges that we need to fetch to the
- * temporary table. Now, actually fetch all of those ranges.
- */
- sql =
- "SELECT path, begin,\n"
- " pg_read_binary_file(path, begin, len, true) AS chunk\n"
- "FROM fetchchunks\n";
-
- receiveFileChunks(sql);
-}
-
static void
-execute_pagemap(datapagemap_t *pagemap, const char *path)
+libpq_destroy(rewind_source *source)
{
- datapagemap_iterator_t *iter;
- BlockNumber blkno;
- off_t offset;
-
- iter = datapagemap_iterate(pagemap);
- while (datapagemap_next(iter, &blkno))
- {
- offset = blkno * BLCKSZ;
-
- fetch_file_range(path, offset, offset + BLCKSZ);
- }
- pg_free(iter);
+ pfree(source);
+ /* NOTE: we don't close the connection here, as it was not opened by us. */
}
diff --git a/src/bin/pg_rewind/local_source.c b/src/bin/pg_rewind/local_source.c
new file mode 100644
index 00000000000..193a412f9ae
--- /dev/null
+++ b/src/bin/pg_rewind/local_source.c
@@ -0,0 +1,134 @@
+/*-------------------------------------------------------------------------
+ *
+ * local_source.c
+ * Functions for using a local data directory as the source.
+ *
+ * Portions Copyright (c) 2013-2020, PostgreSQL Global Development Group
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres_fe.h"
+
+#include <fcntl.h>
+#include <unistd.h>
+
+#include "datapagemap.h"
+#include "file_ops.h"
+#include "filemap.h"
+#include "pg_rewind.h"
+#include "rewind_source.h"
+
+typedef struct
+{
+ rewind_source common; /* common interface functions */
+
+ const char *datadir; /* path to the source data directory */
+} local_source;
+
+static void local_traverse_files(rewind_source *source,
+ process_file_callback_t callback);
+static char *local_fetch_file(rewind_source *source, const char *path,
+ size_t *filesize);
+static void local_fetch_file_range(rewind_source *source, const char *path,
+ off_t off, size_t len);
+static void local_finish_fetch(rewind_source *source);
+static void local_destroy(rewind_source *source);
+
+rewind_source *
+init_local_source(const char *datadir)
+{
+ local_source *src;
+
+ src = pg_malloc0(sizeof(local_source));
+
+ src->common.traverse_files = local_traverse_files;
+ src->common.fetch_file = local_fetch_file;
+ src->common.queue_fetch_range = local_fetch_file_range;
+ src->common.finish_fetch = local_finish_fetch;
+ src->common.get_current_wal_insert_lsn = NULL;
+ src->common.destroy = local_destroy;
+
+ src->datadir = datadir;
+
+ return &src->common;
+}
+
+static void
+local_traverse_files(rewind_source *source, process_file_callback_t callback)
+{
+ traverse_datadir(((local_source *) source)->datadir, &process_source_file);
+}
+
+static char *
+local_fetch_file(rewind_source *source, const char *path, size_t *filesize)
+{
+ return slurpFile(((local_source *) source)->datadir, path, filesize);
+}
+
+/*
+ * Copy a file from source to target, starting at 'off', for 'len' bytes.
+ *
+ * If 'trunc' is true, any existing file with the same name is truncated.
+ */
+static void
+local_fetch_file_range(rewind_source *source, const char *path, off_t off,
+ size_t len)
+{
+ const char *datadir = ((local_source *) source)->datadir;
+ PGAlignedBlock buf;
+ char srcpath[MAXPGPATH];
+ int srcfd;
+ off_t begin = off;
+ off_t end = off + len;
+
+ snprintf(srcpath, sizeof(srcpath), "%s/%s", datadir, path);
+
+ srcfd = open(srcpath, O_RDONLY | PG_BINARY, 0);
+ if (srcfd < 0)
+ pg_fatal("could not open source file \"%s\": %m",
+ srcpath);
+
+ if (lseek(srcfd, begin, SEEK_SET) == -1)
+ pg_fatal("could not seek in source file: %m");
+
+ open_target_file(path, false);
+
+ while (end - begin > 0)
+ {
+ ssize_t readlen;
+ size_t len;
+
+ if (end - begin > sizeof(buf))
+ len = sizeof(buf);
+ else
+ len = end - begin;
+
+ readlen = read(srcfd, buf.data, len);
+
+ if (readlen < 0)
+ pg_fatal("could not read file \"%s\": %m", srcpath);
+ else if (readlen == 0)
+ pg_fatal("unexpected EOF while reading file \"%s\"", srcpath);
+
+ write_target_range(buf.data, begin, readlen);
+ begin += readlen;
+ }
+
+ if (close(srcfd) != 0)
+ pg_fatal("could not close file \"%s\": %m", srcpath);
+}
+
+static void
+local_finish_fetch(rewind_source *source)
+{
+ /*
+ * Nothing to do, local_fetch_file_range() performs the fetching
+ * immediately.
+ */
+}
+
+static void
+local_destroy(rewind_source *source)
+{
+ pfree(source);
+}
diff --git a/src/bin/pg_rewind/pg_rewind.c b/src/bin/pg_rewind/pg_rewind.c
index 574d7f7163b..33042035424 100644
--- a/src/bin/pg_rewind/pg_rewind.c
+++ b/src/bin/pg_rewind/pg_rewind.c
@@ -23,20 +23,25 @@
#include "common/restricted_token.h"
#include "common/string.h"
#include "fe_utils/recovery_gen.h"
-#include "fetch.h"
#include "file_ops.h"
#include "filemap.h"
#include "getopt_long.h"
#include "pg_rewind.h"
+#include "rewind_source.h"
#include "storage/bufpage.h"
static void usage(const char *progname);
+static void perform_rewind(filemap_t *filemap, rewind_source *source,
+ XLogRecPtr chkptrec,
+ TimeLineID chkpttli,
+ XLogRecPtr chkptredo);
+
static void createBackupLabel(XLogRecPtr startpoint, TimeLineID starttli,
XLogRecPtr checkpointloc);
-static void digestControlFile(ControlFileData *ControlFile, char *source,
- size_t size);
+static void digestControlFile(ControlFileData *ControlFile,
+ const char *content, size_t size);
static void getRestoreCommand(const char *argv0);
static void sanityChecks(void);
static void findCommonAncestorTimeline(XLogRecPtr *recptr, int *tliIndex);
@@ -69,6 +74,8 @@ int targetNentries;
uint64 fetch_size;
uint64 fetch_done;
+static PGconn *conn;
+static rewind_source *source;
static void
usage(const char *progname)
@@ -125,9 +132,6 @@ main(int argc, char **argv)
char *buffer;
bool no_ensure_shutdown = false;
bool rewind_needed;
- XLogRecPtr endrec;
- TimeLineID endtli;
- ControlFileData ControlFile_new;
bool writerecoveryconf = false;
filemap_t *filemap;
@@ -269,19 +273,29 @@ main(int argc, char **argv)
atexit(disconnect_atexit);
- /* Connect to remote server */
- if (connstr_source)
- libpqConnect(connstr_source);
-
/*
- * Ok, we have all the options and we're ready to start. Read in all the
- * information we need from both clusters.
+ * Ok, we have all the options and we're ready to start. First, connect
+ * to remote server.
*/
- buffer = slurpFile(datadir_target, "global/pg_control", &size);
- digestControlFile(&ControlFile_target, buffer, size);
- pg_free(buffer);
+ if (connstr_source)
+ {
+ conn = PQconnectdb(connstr_source);
+
+ if (PQstatus(conn) == CONNECTION_BAD)
+ pg_fatal("could not connect to server: %s",
+ PQerrorMessage(conn));
+
+ if (showprogress)
+ pg_log_info("connected to server");
+
+ source = init_libpq_source(conn);
+ }
+ else
+ source = init_local_source(datadir_source);
/*
+ * Check the status of the target instance.
+ *
* If the target instance was not cleanly shut down, start and stop the
* target cluster once in single-user mode to enforce recovery to finish,
* ensuring that the cluster can be used by pg_rewind. Note that if
@@ -289,6 +303,10 @@ main(int argc, char **argv)
* need to make sure by themselves that the target cluster is in a clean
* state.
*/
+ buffer = slurpFile(datadir_target, "global/pg_control", &size);
+ digestControlFile(&ControlFile_target, buffer, size);
+ pg_free(buffer);
+
if (!no_ensure_shutdown &&
ControlFile_target.state != DB_SHUTDOWNED &&
ControlFile_target.state != DB_SHUTDOWNED_IN_RECOVERY)
@@ -300,17 +318,20 @@ main(int argc, char **argv)
pg_free(buffer);
}
- buffer = fetchFile("global/pg_control", &size);
+ buffer = source->fetch_file(source, "global/pg_control", &size);
digestControlFile(&ControlFile_source, buffer, size);
pg_free(buffer);
sanityChecks();
/*
+ * Find the common ancestor timeline between the clusters.
+ *
* If both clusters are already on the same timeline, there's nothing to
* do.
*/
- if (ControlFile_target.checkPointCopy.ThisTimeLineID == ControlFile_source.checkPointCopy.ThisTimeLineID)
+ if (ControlFile_target.checkPointCopy.ThisTimeLineID ==
+ ControlFile_source.checkPointCopy.ThisTimeLineID)
{
pg_log_info("source and target cluster are on the same timeline");
rewind_needed = false;
@@ -373,11 +394,11 @@ main(int argc, char **argv)
filehash_init();
/*
- * Collect information about all files in the target and source systems.
+ * Collect information about all files in the both data directories.
*/
if (showprogress)
pg_log_info("reading source file list");
- fetchSourceFileList();
+ source->traverse_files(source, &process_source_file);
if (showprogress)
pg_log_info("reading target file list");
@@ -421,11 +442,124 @@ main(int argc, char **argv)
}
/*
- * This is the point of no return. Once we start copying things, we have
- * modified the target directory and there is no turning back!
+ * We have now collected all the information we need from both systems,
+ * and we are ready to start modifying the target directory.
+ *
+ * This is the point of no return. Once we start copying things, there is
+ * no turning back!
*/
+ perform_rewind(filemap, source, chkptrec, chkpttli, chkptredo);
- execute_file_actions(filemap);
+ if (showprogress)
+ pg_log_info("syncing target data directory");
+ sync_target_dir();
+
+ /* Also update the standby configuration, if requested. */
+ if (writerecoveryconf && !dry_run)
+ WriteRecoveryConfig(conn, datadir_target,
+ GenerateRecoveryConfig(conn, NULL));
+
+ /* don't need the source connection anymore */
+ source->destroy(source);
+ if (conn)
+ {
+ PQfinish(conn);
+ conn = NULL;
+ }
+
+ pg_log_info("Done!");
+
+ return 0;
+}
+
+/*
+ * Perform the rewind.
+ *
+ * We have already collected all the information we need from the
+ * target and the source.
+ */
+static void
+perform_rewind(filemap_t *filemap, rewind_source *source,
+ XLogRecPtr chkptrec,
+ TimeLineID chkpttli,
+ XLogRecPtr chkptredo)
+{
+ XLogRecPtr endrec;
+ TimeLineID endtli;
+ ControlFileData ControlFile_new;
+
+ /*
+ * Execute the actions in the file map, fetching data from the source
+ * system as needed.
+ */
+ for (int i = 0; i < filemap->nentries; i++)
+ {
+ file_entry_t *entry = filemap->entries[i];
+
+ /*
+ * If this is a relation file, copy the modified blocks.
+ *
+ * This is in addition to any other changes.
+ */
+ if (entry->target_pages_to_overwrite.bitmapsize > 0)
+ {
+ datapagemap_iterator_t *iter;
+ BlockNumber blkno;
+ off_t offset;
+
+ iter = datapagemap_iterate(&entry->target_pages_to_overwrite);
+ while (datapagemap_next(iter, &blkno))
+ {
+ offset = blkno * BLCKSZ;
+ source->queue_fetch_range(source, entry->path, offset, BLCKSZ);
+ }
+ pg_free(iter);
+ }
+
+ switch (entry->action)
+ {
+ case FILE_ACTION_NONE:
+ /* nothing else to do */
+ break;
+
+ case FILE_ACTION_COPY:
+ /* Truncate the old file out of the way, if any */
+ open_target_file(entry->path, true);
+ source->queue_fetch_range(source, entry->path,
+ 0, entry->source_size);
+ break;
+
+ case FILE_ACTION_TRUNCATE:
+ truncate_target_file(entry->path, entry->source_size);
+ break;
+
+ case FILE_ACTION_COPY_TAIL:
+ source->queue_fetch_range(source, entry->path,
+ entry->target_size,
+ entry->source_size - entry->target_size);
+ break;
+
+ case FILE_ACTION_REMOVE:
+ remove_target(entry);
+ break;
+
+ case FILE_ACTION_CREATE:
+ create_target(entry);
+ break;
+
+ case FILE_ACTION_UNDECIDED:
+ pg_fatal("no action decided for \"%s\"", entry->path);
+ break;
+ }
+ }
+
+ /*
+ * We've now copied the list of file ranges that we need to fetch to the
+ * temporary table. Now, actually fetch all of those ranges.
+ */
+ source->finish_fetch(source);
+
+ close_target_file();
progress_report(true);
@@ -445,7 +579,7 @@ main(int argc, char **argv)
if (connstr_source)
{
- endrec = libpqGetCurrentXlogInsertLocation();
+ endrec = source->get_current_wal_insert_lsn(source);
endtli = ControlFile_source.checkPointCopy.ThisTimeLineID;
}
else
@@ -458,18 +592,6 @@ main(int argc, char **argv)
ControlFile_new.state = DB_IN_ARCHIVE_RECOVERY;
if (!dry_run)
update_controlfile(datadir_target, &ControlFile_new, do_sync);
-
- if (showprogress)
- pg_log_info("syncing target data directory");
- sync_target_dir();
-
- if (writerecoveryconf && !dry_run)
- WriteRecoveryConfig(conn, datadir_target,
- GenerateRecoveryConfig(conn, NULL));
-
- pg_log_info("Done!");
-
- return 0;
}
static void
@@ -629,7 +751,7 @@ getTimelineHistory(ControlFileData *controlFile, int *nentries)
/* Get history file from appropriate source */
if (controlFile == &ControlFile_source)
- histfile = fetchFile(path, NULL);
+ histfile = source->fetch_file(source, path, NULL);
else if (controlFile == &ControlFile_target)
histfile = slurpFile(datadir_target, path, NULL);
else
@@ -785,16 +907,17 @@ checkControlFile(ControlFileData *ControlFile)
}
/*
- * Verify control file contents in the buffer src, and copy it to *ControlFile.
+ * Verify control file contents in the buffer 'content', and copy it to *ControlFile.
*/
static void
-digestControlFile(ControlFileData *ControlFile, char *src, size_t size)
+digestControlFile(ControlFileData *ControlFile,
+ const char *content, size_t size)
{
if (size != PG_CONTROL_FILE_SIZE)
pg_fatal("unexpected control file size %d, expected %d",
(int) size, PG_CONTROL_FILE_SIZE);
- memcpy(ControlFile, src, sizeof(ControlFileData));
+ memcpy(ControlFile, content, sizeof(ControlFileData));
/* set and validate WalSegSz */
WalSegSz = ControlFile->xlog_seg_size;
diff --git a/src/bin/pg_rewind/pg_rewind.h b/src/bin/pg_rewind/pg_rewind.h
index 67f90c2a38c..0dc3dbd5255 100644
--- a/src/bin/pg_rewind/pg_rewind.h
+++ b/src/bin/pg_rewind/pg_rewind.h
@@ -20,8 +20,6 @@
/* Configuration options */
extern char *datadir_target;
-extern char *datadir_source;
-extern char *connstr_source;
extern bool showprogress;
extern bool dry_run;
extern bool do_sync;
@@ -31,9 +29,6 @@ extern int WalSegSz;
extern TimeLineHistoryEntry *targetHistory;
extern int targetNentries;
-/* general state */
-extern PGconn *conn;
-
/* Progress counters */
extern uint64 fetch_size;
extern uint64 fetch_done;
diff --git a/src/bin/pg_rewind/rewind_source.h b/src/bin/pg_rewind/rewind_source.h
new file mode 100644
index 00000000000..ec1c160a464
--- /dev/null
+++ b/src/bin/pg_rewind/rewind_source.h
@@ -0,0 +1,79 @@
+/*-------------------------------------------------------------------------
+ *
+ * rewind_source.h
+ * Abstraction for fetching from source server.
+ *
+ * The source server can be either a libpq connection to a live system, or
+ * a local data directory. The 'rewind_source' struct abstracts the
+ * operations to fetch data from the source system, so that the rest of
+ * the code doesn't need to care what kind of a source its dealing with.
+ *
+ * Copyright (c) 2013-2020, PostgreSQL Global Development Group
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef REWIND_SOURCE_H
+#define REWIND_SOURCE_H
+
+#include "access/xlogdefs.h"
+#include "file_ops.h"
+#include "filemap.h"
+#include "libpq-fe.h"
+
+typedef struct rewind_source
+{
+ /*
+ * Traverse all files in the source data directory, and call 'callback'
+ * on each file.
+ */
+ void (*traverse_files) (struct rewind_source *,
+ process_file_callback_t callback);
+
+ /*
+ * Fetch a single file into a malloc'd buffer. The file size is returned
+ * in *filesize. The returned buffer is always zero-terminated, which is
+ * handy for text files.
+ */
+ char *(*fetch_file) (struct rewind_source *, const char *path,
+ size_t *filesize);
+
+ /*
+ * Request to fetch (part of) a file in the source system, specified by an
+ * offset and length, and write it to the same offset in the corresponding
+ * target file. The source implementation may queue up the request and
+ * execute it later when convenient. Call finish_fetch() to flush the
+ * queue and execute all requests.
+ */
+ void (*queue_fetch_range) (struct rewind_source *, const char *path,
+ off_t offset, size_t len);
+
+ /*
+ * Execute all requests queued up with queue_fetch_range().
+ */
+ void (*finish_fetch) (struct rewind_source *);
+
+ /*
+ * Get the current WAL insert position in the source system.
+ */
+ XLogRecPtr (*get_current_wal_insert_lsn) (struct rewind_source *);
+
+ /*
+ * Free this rewind_source object.
+ */
+ void (*destroy) (struct rewind_source *);
+
+} rewind_source;
+
+
+/*
+ * Execute all the actions in 'filemap'.
+ */
+extern void execute_file_actions(filemap_t *filemap, rewind_source *source);
+
+/* in libpq_source.c */
+extern rewind_source *init_libpq_source(PGconn *conn);
+
+/* in local_source.c */
+extern rewind_source *init_local_source(const char *datadir);
+
+#endif /* FETCH_H */
--
2.20.1
>From 32a460c977384e580d38b58e3c5ac5359a7f56b9 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakan...@iki.fi>
Date: Thu, 24 Sep 2020 19:35:39 +0300
Subject: [PATCH v3 5/5] Allow pg_rewind to use a standby server as the source
system.
Using a hot standby server as the source has not been possible, because
pg_rewind creates a temporary table in the source system, to hold the
list of file ranges that need to be fetched. Refactor it to queue up the
file fetch requests in pg_rewind's memory, so that the temporary table
is no longer needed.
Also update the logic to compute 'minRecoveryPoint' correctly, when the
source is a standby server.
Reviewed-by: Kyotaro Horiguchi, Soumyadeep Chakraborty
Discussion: https://www.postgresql.org/message-id/0c5b3783-af52-3ee5-f8fa-6e794061f70d%40iki.fi
---
src/bin/pg_rewind/libpq_source.c | 277 ++++++++++++++++++++++---------
src/bin/pg_rewind/pg_rewind.c | 74 +++++++--
2 files changed, 259 insertions(+), 92 deletions(-)
diff --git a/src/bin/pg_rewind/libpq_source.c b/src/bin/pg_rewind/libpq_source.c
index 30294d582ee..5e0f02911e0 100644
--- a/src/bin/pg_rewind/libpq_source.c
+++ b/src/bin/pg_rewind/libpq_source.c
@@ -14,30 +14,51 @@
#include "datapagemap.h"
#include "file_ops.h"
#include "filemap.h"
+#include "lib/stringinfo.h"
#include "pg_rewind.h"
#include "port/pg_bswap.h"
#include "rewind_source.h"
/*
- * Files are fetched max CHUNKSIZE bytes at a time.
- *
- * (This only applies to files that are copied in whole, or for truncated
- * files where we copy the tail. Relation files, where we know the individual
- * blocks that need to be fetched, are fetched in BLCKSZ chunks.)
+ * Files are fetched MAX_CHUNK_SIZE bytes at a time, and with a
+ * maximum of MAX_CHUNKS_PER_QUERY chunks in a single query.
*/
-#define CHUNKSIZE 1000000
+#define MAX_CHUNK_SIZE (1024 * 1024)
+#define MAX_CHUNKS_PER_QUERY 1000
+
+/* represents the request to fetch a piece of a file from the source */
+typedef struct
+{
+ const char *path; /* path relative to data directory root */
+ off_t offset;
+ size_t length;
+} fetch_range_request;
typedef struct
{
rewind_source common; /* common interface functions */
PGconn *conn;
- bool copy_started;
+
+ /*
+ * Queue of chunks that have been requested with the queue_fetch_range()
+ * function, but have not been fetched from the remote server yet.
+ */
+ int num_requests;
+ fetch_range_request request_queue[MAX_CHUNKS_PER_QUERY];
+
+ /* temporary space for process_queued_fetch_requests() */
+ StringInfoData paths;
+ StringInfoData offsets;
+ StringInfoData lengths;
} libpq_source;
static void init_libpq_conn(PGconn *conn);
static char *run_simple_query(PGconn *conn, const char *sql);
static void run_simple_command(PGconn *conn, const char *sql);
+static void appendArrayEscapedString(StringInfo buf, const char *str);
+
+static void process_queued_fetch_requests(libpq_source *src);
/* public interface functions */
static void libpq_traverse_files(rewind_source *source,
@@ -74,6 +95,10 @@ init_libpq_source(PGconn *conn)
src->conn = conn;
+ initStringInfo(&src->paths);
+ initStringInfo(&src->offsets);
+ initStringInfo(&src->lengths);
+
return &src->common;
}
@@ -91,6 +116,12 @@ init_libpq_conn(PGconn *conn)
run_simple_command(conn, "SET lock_timeout = 0");
run_simple_command(conn, "SET idle_in_transaction_session_timeout = 0");
+ /*
+ * we don't intend to do any updates, put the connection in read-only mode
+ * to keep us honest
+ */
+ run_simple_command(conn, "SET default_transaction_read_only = on");
+
/* secure search_path */
res = PQexec(conn, ALWAYS_SECURE_SEARCH_PATH_SQL);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
@@ -98,17 +129,6 @@ init_libpq_conn(PGconn *conn)
PQresultErrorMessage(res));
PQclear(res);
- /*
- * Check that the server is not in hot standby mode. There is no
- * fundamental reason that couldn't be made to work, but it doesn't
- * currently because we use a temporary table. Better to check for it
- * explicitly than error out, for a better error message.
- */
- str = run_simple_query(conn, "SELECT pg_is_in_recovery()");
- if (strcmp(str, "f") != 0)
- pg_fatal("source server must not be in recovery mode");
- pg_free(str);
-
/*
* Also check that full_page_writes is enabled. We can get torn pages if
* a page is modified while we read it with pg_read_binary_file(), and we
@@ -118,6 +138,18 @@ init_libpq_conn(PGconn *conn)
if (strcmp(str, "on") != 0)
pg_fatal("full_page_writes must be enabled in the source server");
pg_free(str);
+
+ /* Prepare a statement we'll use to fetch files */
+ res = PQprepare(conn, "fetch_chunks_stmt",
+ "SELECT path, begin,\n"
+ " pg_read_binary_file(path, begin, len, true) AS chunk\n"
+ "FROM unnest ($1::text[], $2::int8[], $3::int4[]) as x(path, begin, len)",
+ 3, NULL);
+
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pg_fatal("could not prepare statement to fetch file contents: %s",
+ PQresultErrorMessage(res));
+ PQclear(res);
}
/*
@@ -283,94 +315,128 @@ libpq_queue_fetch_range(rewind_source *source, const char *path, off_t off,
size_t len)
{
libpq_source *src = (libpq_source *) source;
- uint64 begin = off;
- uint64 end = off + len;
/*
- * On first call, create a temporary table, and start COPYing to it.
- * We will load it with the list of blocks that we need to fetch.
+ * Does this request happen to be a continuation of the previous chunk?
+ * If so, merge it with the previous one.
+ *
+ * XXX: We use pointer equality to compare the path. That's good enough
+ * for our purposes; the caller always passes the same pointer for the
+ * same filename. If it didn't, we would fail to merge requests, but it
+ * wouldn't affect correctness.
*/
- if (!src->copy_started)
+ if (src->num_requests > 0)
{
- PGresult *res;
+ fetch_range_request *prev = &src->request_queue[src->num_requests - 1];
- run_simple_command(src->conn, "CREATE TEMPORARY TABLE fetchchunks(path text, begin int8, len int4)");
+ if (prev->offset + prev->length == off &&
+ prev->length < MAX_CHUNK_SIZE &&
+ prev->path == path)
+ {
+ /*
+ * Extend the previous request to cover as much of this new request
+ * as possible, without exceeding MAX_CHUNK_SIZE.
+ */
+ size_t thislen;
- res = PQexec(src->conn, "COPY fetchchunks FROM STDIN");
- if (PQresultStatus(res) != PGRES_COPY_IN)
- pg_fatal("could not send file list: %s",
- PQresultErrorMessage(res));
- PQclear(res);
+ thislen = Min(len, MAX_CHUNK_SIZE - prev->length);
+ prev->length += thislen;
- src->copy_started = true;
- }
+ off += thislen;
+ len -= thislen;
- /*
- * Write the file range to a temporary table in the server.
- *
- * The range is sent to the server as a COPY formatted line, to be inserted
- * into the 'fetchchunks' temporary table. The libpq_finish_fetch() uses
- * the temporary table to actually fetch the data.
- */
+ /*
+ * Fall through to create new requests for any remaining 'len' that
+ * didn't fit in the previous chunk.
+ */
+ }
+ }
- /* Split the range into CHUNKSIZE chunks */
- while (end - begin > 0)
+ /* Divide the request into pieces of MAX_CHUNK_SIZE bytes each */
+ while (len > 0)
{
- char linebuf[MAXPGPATH + 23];
- unsigned int len;
+ int32 thislen;
- /* Fine as long as CHUNKSIZE is not bigger than UINT32_MAX */
- if (end - begin > CHUNKSIZE)
- len = CHUNKSIZE;
- else
- len = (unsigned int) (end - begin);
-
- snprintf(linebuf, sizeof(linebuf), "%s\t" UINT64_FORMAT "\t%u\n", path, begin, len);
+ /* if the queue is full, perform all the work queued up so far */
+ if (src->num_requests == MAX_CHUNKS_PER_QUERY)
+ process_queued_fetch_requests(src);
- if (PQputCopyData(src->conn, linebuf, strlen(linebuf)) != 1)
- pg_fatal("could not send COPY data: %s",
- PQerrorMessage(src->conn));
+ thislen = Min(len, MAX_CHUNK_SIZE);
+ src->request_queue[src->num_requests].path = path;
+ src->request_queue[src->num_requests].offset = off;
+ src->request_queue[src->num_requests].length = thislen;
+ src->num_requests++;
- begin += len;
+ off += thislen;
+ len -= thislen;
}
}
/*
- * Receive all the queued chunks and write them to the target data directory.
+ * Fetch all the queued chunks and writes them to the target data directory.
*/
static void
libpq_finish_fetch(rewind_source *source)
{
- libpq_source *src = (libpq_source *) source;
+ process_queued_fetch_requests((libpq_source *) source);
+}
+
+/*
+ * Receive all the queued chunks and write them to the target data directory.
+ */
+static void
+process_queued_fetch_requests(libpq_source *src)
+{
+ const char *params[3];
PGresult *res;
- const char *sql;
+ int chunkno;
- if (PQputCopyEnd(src->conn, NULL) != 1)
- pg_fatal("could not send end-of-COPY: %s",
- PQerrorMessage(src->conn));
+ if (src->num_requests == 0)
+ return;
- while ((res = PQgetResult(src->conn)) != NULL)
+ pg_log_debug("getting %d file chunks", src->num_requests);
+
+ /*
+ * The prepared statement, 'fetch_chunks_stmt', takes three arrays
+ * with the same length as parameters: paths, offsets and lengths.
+ * Construct the string representations of the parameter arrays.
+ */
+ resetStringInfo(&src->paths);
+ resetStringInfo(&src->offsets);
+ resetStringInfo(&src->lengths);
+
+ appendStringInfoChar(&src->paths, '{');
+ appendStringInfoChar(&src->offsets, '{');
+ appendStringInfoChar(&src->lengths, '{');
+ for (int i = 0; i < src->num_requests; i++)
{
- if (PQresultStatus(res) != PGRES_COMMAND_OK)
- pg_fatal("unexpected result while sending file list: %s",
- PQresultErrorMessage(res));
- PQclear(res);
+ fetch_range_request *rq = &src->request_queue[i];
+
+ if (i > 0)
+ {
+ appendStringInfoChar(&src->paths, ',');
+ appendStringInfoChar(&src->offsets, ',');
+ appendStringInfoChar(&src->lengths, ',');
+ }
+
+ appendArrayEscapedString(&src->paths, rq->path);
+ appendStringInfo(&src->offsets, INT64_FORMAT, (int64) rq->offset);
+ appendStringInfo(&src->lengths, INT64_FORMAT, (int64) rq->length);
}
+ appendStringInfoChar(&src->paths, '}');
+ appendStringInfoChar(&src->offsets, '}');
+ appendStringInfoChar(&src->lengths, '}');
/*
- * We've now copied the list of file ranges that we need to fetch to the
- * temporary table. Now, actually fetch all of those ranges.
+ * Execute the prepared statement.
*/
- sql =
- "SELECT path, begin,\n"
- " pg_read_binary_file(path, begin, len, true) AS chunk\n"
- "FROM fetchchunks\n";
+ params[0] = src->paths.data;
+ params[1] = src->offsets.data;
+ params[2] = src->lengths.data;
- if (PQsendQueryParams(src->conn, sql, 0, NULL, NULL, NULL, NULL, 1) != 1)
+ if (PQsendQueryPrepared(src->conn, "fetch_chunks_stmt", 3, params, NULL, NULL, 1) != 1)
pg_fatal("could not send query: %s", PQerrorMessage(src->conn));
- pg_log_debug("getting file chunks");
-
if (PQsetSingleRowMode(src->conn) != 1)
pg_fatal("could not set libpq connection to single row mode");
@@ -382,8 +448,10 @@ libpq_finish_fetch(rewind_source *source)
* chunk bytea -- file content
*----
*/
+ chunkno = 0;
while ((res = PQgetResult(src->conn)) != NULL)
{
+ fetch_range_request *rq = &src->request_queue[chunkno];
char *filename;
int filenamelen;
int64 chunkoff;
@@ -404,6 +472,9 @@ libpq_finish_fetch(rewind_source *source)
PQresultErrorMessage(res));
}
+ if (chunkno > src->num_requests)
+ pg_fatal("received more data chunks than requested");
+
/* sanity check the result set */
if (PQnfields(res) != 3 || PQntuples(res) != 1)
pg_fatal("unexpected result set size while fetching remote files");
@@ -448,9 +519,7 @@ libpq_finish_fetch(rewind_source *source)
* If a file has been deleted on the source, remove it on the target
* as well. Note that multiple unlink() calls may happen on the same
* file if multiple data chunks are associated with it, hence ignore
- * unconditionally anything missing. If this file is not a relation
- * data file, then it has been already truncated when creating the
- * file chunk list at the previous execution of the filemap.
+ * unconditionally anything missing.
*/
if (PQgetisnull(res, 0, 2))
{
@@ -465,6 +534,26 @@ libpq_finish_fetch(rewind_source *source)
pg_log_debug("received chunk for file \"%s\", offset " INT64_FORMAT ", size %d",
filename, chunkoff, chunksize);
+ if (strcmp(filename, rq->path) != 0)
+ {
+ pg_fatal("received data for file \"%s\", when requested for \"%s\"",
+ filename, rq->path);
+ }
+ if (chunkoff != rq->offset)
+ pg_fatal("received data at offset " INT64_FORMAT" of file \"%s\", when requested for offset " INT64_FORMAT,
+ chunkoff, rq->path, (int64) rq->offset);
+
+ /*
+ * We should not receive receive more data than we requested, or
+ * pg_read_binary_file() messed up. We could receive less, though, if
+ * the file was truncated in the source after checked it size. That's
+ * OK, there should be a WAL record of the truncation, which will get
+ * replayed when you start the target system for the first time after
+ * pg_rewind has completed.
+ */
+ if (chunksize > rq->length)
+ pg_fatal("received more than requested for file \"%s\"", rq->path);
+
open_target_file(filename, false);
write_target_range(chunk, chunkoff, chunksize);
@@ -472,7 +561,33 @@ libpq_finish_fetch(rewind_source *source)
pg_free(filename);
PQclear(res);
+ chunkno++;
+ }
+ if (chunkno != src->num_requests)
+ pg_fatal("unexpected number of data chunks received");
+
+ src->num_requests = 0;
+}
+
+/*
+ * Escape a string to be used as element in a text array constant
+ */
+static void
+appendArrayEscapedString(StringInfo buf, const char *str)
+{
+ appendStringInfoCharMacro(buf, '\"');
+ while (*str)
+ {
+ char ch = *str;
+
+ if (ch == '"' || ch == '\\')
+ appendStringInfoCharMacro(buf, '\\');
+
+ appendStringInfoCharMacro(buf, ch);
+
+ str++;
}
+ appendStringInfoCharMacro(buf, '\"');
}
/*
@@ -521,6 +636,12 @@ libpq_fetch_file(rewind_source *source, const char *path, size_t *filesize)
static void
libpq_destroy(rewind_source *source)
{
- pfree(source);
+ libpq_source *src = (libpq_source *) source;
+
+ pfree(src->paths.data);
+ pfree(src->offsets.data);
+ pfree(src->lengths.data);
+ pfree(src);
+
/* NOTE: we don't close the connection here, as it was not opened by us. */
}
diff --git a/src/bin/pg_rewind/pg_rewind.c b/src/bin/pg_rewind/pg_rewind.c
index 33042035424..036f24c332b 100644
--- a/src/bin/pg_rewind/pg_rewind.c
+++ b/src/bin/pg_rewind/pg_rewind.c
@@ -50,6 +50,7 @@ static void disconnect_atexit(void);
static ControlFileData ControlFile_target;
static ControlFileData ControlFile_source;
+static ControlFileData ControlFile_source_after;
const char *progname;
int WalSegSz;
@@ -487,6 +488,8 @@ perform_rewind(filemap_t *filemap, rewind_source *source,
XLogRecPtr endrec;
TimeLineID endtli;
ControlFileData ControlFile_new;
+ size_t size;
+ char *buffer;
/*
* Execute the actions in the file map, fetching data from the source
@@ -553,40 +556,83 @@ perform_rewind(filemap_t *filemap, rewind_source *source,
}
}
- /*
- * We've now copied the list of file ranges that we need to fetch to the
- * temporary table. Now, actually fetch all of those ranges.
- */
+ /* Complete any remaining range-fetches that we queued up above. */
source->finish_fetch(source);
close_target_file();
progress_report(true);
+ /*
+ * Fetch the control file from the source last. This ensures that the
+ * minRecoveryPoint is up-to-date.
+ */
+ buffer = source->fetch_file(source, "global/pg_control", &size);
+ digestControlFile(&ControlFile_source_after, buffer, size);
+ pg_free(buffer);
+
+ /*
+ * Sanity check: If the source is a local system, the control file should
+ * not have changed since we started.
+ *
+ * XXX: We assume it hasn't been modified, but actually, what could go
+ * wrong? The logic handles a libpq source that's modified concurrently,
+ * why not a local datadir?
+ */
+ if (datadir_source &&
+ memcmp(&ControlFile_source, &ControlFile_source_after,
+ sizeof(ControlFileData)) != 0)
+ {
+ pg_fatal("source system was modified while pg_rewind was running");
+ }
+
if (showprogress)
pg_log_info("creating backup label and updating control file");
createBackupLabel(chkptredo, chkpttli, chkptrec);
/*
* Update control file of target. Make it ready to perform archive
- * recovery when restarting.
+ * recovery when restarting, starting from the last common checkpoint.
*
- * minRecoveryPoint is set to the current WAL insert location in the
- * source server. Like in an online backup, it's important that we recover
- * all the WAL that was generated while we copied the files over.
+ * Like in an online backup, it's important that we replay all the WAL
+ * that was generated while we copied the files over. To enforce that,
+ * set 'minRecoveryPoint' in the control file.
*/
- memcpy(&ControlFile_new, &ControlFile_source, sizeof(ControlFileData));
-
if (connstr_source)
{
- endrec = source->get_current_wal_insert_lsn(source);
- endtli = ControlFile_source.checkPointCopy.ThisTimeLineID;
+ if (ControlFile_source_after.state == DB_IN_ARCHIVE_RECOVERY)
+ {
+ /*
+ * Source is a standby server. We must replay to its
+ * minRecoveryPoint.
+ */
+ endrec = ControlFile_source_after.minRecoveryPoint;
+ endtli = ControlFile_source_after.minRecoveryPointTLI;
+ }
+ else
+ {
+ /*
+ * Source is a production, non-standby, server. We must recover up
+ * to the last WAL insert location.
+ */
+ if (ControlFile_source_after.state != DB_IN_PRODUCTION)
+ pg_fatal("source system was in unexpected state at end of rewind");
+
+ endrec = source->get_current_wal_insert_lsn(source);
+ endtli = ControlFile_source_after.checkPointCopy.ThisTimeLineID;
+ }
}
else
{
- endrec = ControlFile_source.checkPoint;
- endtli = ControlFile_source.checkPointCopy.ThisTimeLineID;
+ /*
+ * Source is a local data directory. It should've shut down cleanly,
+ * and we must replay to the latest shutdown checkpoint.
+ */
+ endrec = ControlFile_source_after.checkPoint;
+ endtli = ControlFile_source_after.checkPointCopy.ThisTimeLineID;
}
+
+ memcpy(&ControlFile_new, &ControlFile_source_after, sizeof(ControlFileData));
ControlFile_new.minRecoveryPoint = endrec;
ControlFile_new.minRecoveryPointTLI = endtli;
ControlFile_new.state = DB_IN_ARCHIVE_RECOVERY;
--
2.20.1