On 16/12/2020 00:08, Cary Huang wrote:
The following review has been posted through the commitfest application:
make installcheck-world:  tested, passed
Implements feature:       tested, passed
Spec compliant:           tested, passed
Documentation:            not tested

Hello

The patch seems to do as described and the regression and tap tests are passing
+       /*
+        * A local source is not expected to change while we're rewinding, so 
check
+        * that we size of the file matches our earlier expectation.
+        */
Is this a tyoo?

Yep, thanks! Attached is a new patch version, with that fixed and rebased. No other changes.

- Heikki
>From 649ce2ffb7ef390e96dbde9bd7da27a8a3d330d4 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakan...@iki.fi>
Date: Fri, 22 Jan 2021 15:16:41 +0200
Subject: [PATCH v2 1/1] pg_rewind: Fetch small files according to new size.

There's a race condition if a file changes in the source system after we
have collected the file list. If the file becomes larger, we only fetched
up to its original size. That can easily result in a truncated file.
That's not a problem for relation files, files in pg_xact, etc. because
any actions on them will be replayed from the WAL. However, configuration
files are affected.

This commit mitigates the race condition by fetching small files in
whole, even if they have grown. This is not a full fix: we still believe
the original file size for files larger than 1 MB. That should be enough
for configuration files, and doing more than that would require bigger
changes to the chunking logic in in libpq_source.c.

That mitigates the race condition if the file is modified between the
original scan of files and copying the file, but there's still a race
condition if a file is changed while it's being copied. That's a much
smaller window, though, and pg_basebackup has the same issue.

I ran into this while playing with pg_auto_failover, which frequently
uses ALTER SYSTEM, which update postgresql.auto.conf. Often, pg_rewind
would fail, because the postgresql.auto.conf file changed concurrently
and a partial version of it was copied to the target. The partial
file would fail to parse, preventing the server from starting up.

Reviewed-by: Cary Huang
Discussion: https://www.postgresql.org/message-id/f67feb24-5833-88cb-1020-19a4a2b83ac7%40iki.fi
---
 src/bin/pg_rewind/libpq_source.c  | 32 +++++++++++++
 src/bin/pg_rewind/local_source.c  | 76 +++++++++++++++++++++++++++----
 src/bin/pg_rewind/pg_rewind.c     |  5 +-
 src/bin/pg_rewind/rewind_source.h | 13 ++++++
 4 files changed, 112 insertions(+), 14 deletions(-)

diff --git a/src/bin/pg_rewind/libpq_source.c b/src/bin/pg_rewind/libpq_source.c
index 86d2adcaee9..ff16add16f5 100644
--- a/src/bin/pg_rewind/libpq_source.c
+++ b/src/bin/pg_rewind/libpq_source.c
@@ -63,6 +63,7 @@ static void process_queued_fetch_requests(libpq_source *src);
 /* public interface functions */
 static void libpq_traverse_files(rewind_source *source,
 								 process_file_callback_t callback);
+static void libpq_queue_fetch_file(rewind_source *source, const char *path, size_t len);
 static void libpq_queue_fetch_range(rewind_source *source, const char *path,
 									off_t off, size_t len);
 static void libpq_finish_fetch(rewind_source *source);
@@ -88,6 +89,7 @@ init_libpq_source(PGconn *conn)
 
 	src->common.traverse_files = libpq_traverse_files;
 	src->common.fetch_file = libpq_fetch_file;
+	src->common.queue_fetch_file = libpq_queue_fetch_file;
 	src->common.queue_fetch_range = libpq_queue_fetch_range;
 	src->common.finish_fetch = libpq_finish_fetch;
 	src->common.get_current_wal_insert_lsn = libpq_get_current_wal_insert_lsn;
@@ -307,6 +309,36 @@ libpq_traverse_files(rewind_source *source, process_file_callback_t callback)
 	PQclear(res);
 }
 
+/*
+ * Queue up a request to fetch a file from remote system.
+ */
+static void
+libpq_queue_fetch_file(rewind_source *source, const char *path, size_t len)
+{
+	/*
+	 * Truncate the target file immediately, and queue a request to fetch it
+	 * from the source. If the file is small, smaller than MAX_CHUNK_SIZE,
+	 * request fetching a full-sized chunk anyway, so that if the file has
+	 * become larger in the source system, after we scanned the source
+	 * directory, we still fetch the whole file. This only works for files up
+	 * to MAX_CHUNK_SIZE, but that's good enough for small configuration files
+	 * and such that are changed every now and then, but not WAL-logged.
+	 * For larger files, we fetch up to the original size.
+	 *
+	 * Even with that mechanism, there is an inherent race condition if the
+	 * file is modified at the same instant that we're copying it, so that we
+	 * might copy a torn version of the file with one half from the old
+	 * version and another half from the new. But pg_basebackup has the same
+	 * problem, and it hasn't been problem in practice.
+	 *
+	 * It might seem more natural to truncate the file later, when we receive
+	 * it from the source server, but then we'd need to track which
+	 * fetch-requests are for a whole file.
+	 */
+	open_target_file(path, true);
+	libpq_queue_fetch_range(source, path, 0, Max(len, MAX_CHUNK_SIZE));
+}
+
 /*
  * Queue up a request to fetch a piece of a file from remote system.
  */
diff --git a/src/bin/pg_rewind/local_source.c b/src/bin/pg_rewind/local_source.c
index 9c3491c3fba..1899d1cc4ae 100644
--- a/src/bin/pg_rewind/local_source.c
+++ b/src/bin/pg_rewind/local_source.c
@@ -29,8 +29,10 @@ static void local_traverse_files(rewind_source *source,
 								 process_file_callback_t callback);
 static char *local_fetch_file(rewind_source *source, const char *path,
 							  size_t *filesize);
-static void local_fetch_file_range(rewind_source *source, const char *path,
-								   off_t off, size_t len);
+static void local_queue_fetch_file(rewind_source *source, const char *path,
+								   size_t len);
+static void local_queue_fetch_range(rewind_source *source, const char *path,
+									off_t off, size_t len);
 static void local_finish_fetch(rewind_source *source);
 static void local_destroy(rewind_source *source);
 
@@ -43,7 +45,8 @@ init_local_source(const char *datadir)
 
 	src->common.traverse_files = local_traverse_files;
 	src->common.fetch_file = local_fetch_file;
-	src->common.queue_fetch_range = local_fetch_file_range;
+	src->common.queue_fetch_file = local_queue_fetch_file;
+	src->common.queue_fetch_range = local_queue_fetch_range;
 	src->common.finish_fetch = local_finish_fetch;
 	src->common.get_current_wal_insert_lsn = NULL;
 	src->common.destroy = local_destroy;
@@ -65,12 +68,65 @@ local_fetch_file(rewind_source *source, const char *path, size_t *filesize)
 	return slurpFile(((local_source *) source)->datadir, path, filesize);
 }
 
+/*
+ * Copy a file from source to target.
+ *
+ * 'len' is the expected length of the file.
+ */
+static void
+local_queue_fetch_file(rewind_source *source, const char *path, size_t len)
+{
+	const char *datadir = ((local_source *) source)->datadir;
+	PGAlignedBlock buf;
+	char		srcpath[MAXPGPATH];
+	int			srcfd;
+	size_t		written_len;
+
+	snprintf(srcpath, sizeof(srcpath), "%s/%s", datadir, path);
+
+	/* Open source file for reading. */
+	srcfd = open(srcpath, O_RDONLY | PG_BINARY, 0);
+	if (srcfd < 0)
+		pg_fatal("could not open source file \"%s\": %m",
+				 srcpath);
+
+	/* Truncate and open the target file for writing. */
+	open_target_file(path, true);
+
+	written_len = 0;
+	for (;;)
+	{
+		ssize_t		read_len;
+
+		read_len = read(srcfd, buf.data, sizeof(buf));
+
+		if (read_len < 0)
+			pg_fatal("could not read file \"%s\": %m", srcpath);
+		else if (read_len == 0)
+			break;	/* EOF reached */
+
+		write_target_range(buf.data, written_len, read_len);
+		written_len += read_len;
+	}
+
+	/*
+	 * A local source is not expected to change while we're rewinding, so check
+	 * that the size of the file matches our earlier expectation.
+	 */
+	if (written_len != len)
+		pg_fatal("size of source file \"%s\" changed concurrently: " UINT64_FORMAT " bytes expected, " UINT64_FORMAT " copied",
+				 srcpath, len, written_len);
+
+	if (close(srcfd) != 0)
+		pg_fatal("could not close file \"%s\": %m", srcpath);
+}
+
 /*
  * Copy a file from source to target, starting at 'off', for 'len' bytes.
  */
 static void
-local_fetch_file_range(rewind_source *source, const char *path, off_t off,
-					   size_t len)
+local_queue_fetch_range(rewind_source *source, const char *path, off_t off,
+						size_t len)
 {
 	const char *datadir = ((local_source *) source)->datadir;
 	PGAlignedBlock buf;
@@ -94,14 +150,14 @@ local_fetch_file_range(rewind_source *source, const char *path, off_t off,
 	while (end - begin > 0)
 	{
 		ssize_t		readlen;
-		size_t		len;
+		size_t		thislen;
 
 		if (end - begin > sizeof(buf))
-			len = sizeof(buf);
+			thislen = sizeof(buf);
 		else
-			len = end - begin;
+			thislen = end - begin;
 
-		readlen = read(srcfd, buf.data, len);
+		readlen = read(srcfd, buf.data, thislen);
 
 		if (readlen < 0)
 			pg_fatal("could not read file \"%s\": %m", srcpath);
@@ -120,7 +176,7 @@ static void
 local_finish_fetch(rewind_source *source)
 {
 	/*
-	 * Nothing to do, local_fetch_file_range() copies the ranges immediately.
+	 * Nothing to do, local_queue_fetch_range() copies the ranges immediately.
 	 */
 }
 
diff --git a/src/bin/pg_rewind/pg_rewind.c b/src/bin/pg_rewind/pg_rewind.c
index 359a6a587cb..9030a1505e3 100644
--- a/src/bin/pg_rewind/pg_rewind.c
+++ b/src/bin/pg_rewind/pg_rewind.c
@@ -537,10 +537,7 @@ perform_rewind(filemap_t *filemap, rewind_source *source,
 				break;
 
 			case FILE_ACTION_COPY:
-				/* Truncate the old file out of the way, if any */
-				open_target_file(entry->path, true);
-				source->queue_fetch_range(source, entry->path,
-										  0, entry->source_size);
+				source->queue_fetch_file(source, entry->path, entry->source_size);
 				break;
 
 			case FILE_ACTION_TRUNCATE:
diff --git a/src/bin/pg_rewind/rewind_source.h b/src/bin/pg_rewind/rewind_source.h
index 2da92dbff94..799b7c120ea 100644
--- a/src/bin/pg_rewind/rewind_source.h
+++ b/src/bin/pg_rewind/rewind_source.h
@@ -47,6 +47,19 @@ typedef struct rewind_source
 	void		(*queue_fetch_range) (struct rewind_source *, const char *path,
 									  off_t offset, size_t len);
 
+	/*
+	 * Like queue_fetch_range(), but requests replacing the whole local file
+	 * from the source system. 'len' is the expected length of the file,
+	 * although when the source is a live server, the file may change
+	 * concurrently. The implementation is not obliged to copy more than 'len'
+	 * bytes, even if the file is larger. However, to avoid copying a
+	 * truncated version of the file, which can cause trouble if e.g. a
+	 * configuration file is modified concurrently, the implementation should
+	 * try to copy the whole file, even if it's larger than expected.
+	 */
+	void		(*queue_fetch_file) (struct rewind_source *, const char *path,
+									 size_t len);
+
 	/*
 	 * Execute all requests queued up with queue_fetch_range().
 	 */
-- 
2.29.2

Reply via email to