On 6 March 2018 at 09:58, Craig Ringer <cr...@2ndquadrant.com> wrote:

> On 5 March 2018 at 23:25, David Steele <da...@pgmasters.net> wrote:
>
>> Hi Craig,
>>
>> On 1/21/18 5:45 PM, Craig Ringer wrote:
>> > On 6 January 2018 at 08:28, Alvaro Herrera <alvhe...@alvh.no-ip.org
>> > <mailto:alvhe...@alvh.no-ip.org>> wrote:
>> >
>> >     I think this should use ReadDirExtended with an elevel less than
>> ERROR,
>> >     and do nothing.
>> >
>> >     Why have strcmp(.) and strcmp(..)?  These are going to be skipped
>> by the
>> >     comparison to "xid" prefix anyway.  Looks like strcmp processing
>> >     power waste.
>> >
>> >     Please don't use bare sprintf() -- upgrade to snprintf.
>> >
>> >     With this coding, if I put a root-owned file "xidfoo" in a replslot
>> >     directory, it will PANIC the server.  Is that okay?  Why not read
>> the
>> >     file name with sscanf(), since we know the precise format it has?
>> Then
>> >     we don't need to bother with random crap left around.  Maybe a good
>> time
>> >     to put the "xid-%u-lsn-%X-%X.snap" literal in a macro?   I guess the
>> >     rationale is that if you let random people put "xidfoo" files in
>> your
>> >     replication slot dirs, you deserve a PANIC anyway, but I'm not sure.
>> >
>> > I'm happy to address those comments.
>> >
>> > The PANIC probably made sense when it was only done on startup, but not
>> > now it's at runtime.
>> >
>> > The rest is mainly retained from the prior code, but it's a good chance
>> > to make those changes.
>> This patch was marked Waiting on Author last December.  Do you know when
>> you'll have a chance to provide an updated patch?
>>
>> Given that it's a bug fix it would be good to see a patch for this CF,
>> or very soon after.
>>
>>
> Thanks for the reminder. I'll address it today if I can.
>
>
Revised patch attached.

I have _not_ rewritten to use sscanf yet. I'll do that next, so you can
choose the fewer-changes patch for backpatching if desired.



-- 
 Craig Ringer                   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services
From 9125690b7216527a4a388de987de609a86224119 Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Tue, 5 Dec 2017 13:32:45 +0800
Subject: [PATCH v2] Clean up reorder buffer files when starting logical
 decoding

We could fail to clean up reorder buffer files if the walsender exited due to a
client disconnect, because we'd skip both the normal exit and error paths.
---
 src/backend/replication/logical/reorderbuffer.c | 95 +++++++++++++++----------
 1 file changed, 58 insertions(+), 37 deletions(-)

diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index c72a611a39..87522dd121 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -196,6 +196,7 @@ static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn
 static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 						   char *change);
 static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
+static void ReorderBufferCleanSerializedTXNs(const char *slotname);
 
 static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap);
 static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
@@ -212,9 +213,11 @@ static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
 static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn,
 							  Relation relation, ReorderBufferChange *change);
 
+#define REORDERBUFFER_TXN_FILEPATH_FORMAT "pg_replslot/%s/xid-%u-lsn-%X-%X.snap"
 
 /*
- * Allocate a new ReorderBuffer
+ * Allocate a new ReorderBuffer and clean out any old serialized state from
+ * prior ReorderBuffer instances for the same slot.
  */
 ReorderBuffer *
 ReorderBufferAllocate(void)
@@ -223,6 +226,8 @@ ReorderBufferAllocate(void)
 	HASHCTL		hash_ctl;
 	MemoryContext new_ctx;
 
+	Assert(MyReplicationSlot != NULL);
+
 	/* allocate memory in own context, to have better accountability */
 	new_ctx = AllocSetContextCreate(CurrentMemoryContext,
 									"ReorderBuffer",
@@ -269,6 +274,13 @@ ReorderBufferAllocate(void)
 
 	dlist_init(&buffer->toplevel_by_lsn);
 
+	/*
+	 * Ensure there's no stale data from prior uses of this slot, in case some
+	 * prior exit avoided calling ReorderBufferFree. Failure to do this can
+	 * produce duplicated txns, and it's very cheap if there's nothing there.
+	 */
+	ReorderBufferCleanSerializedTXNs(NameStr(MyReplicationSlot->data.name));
+
 	return buffer;
 }
 
@@ -285,6 +297,9 @@ ReorderBufferFree(ReorderBuffer *rb)
 	 * memory context.
 	 */
 	MemoryContextDelete(context);
+
+	/* Free disk space used by unconsumed reorder buffers */
+	ReorderBufferCleanSerializedTXNs(NameStr(MyReplicationSlot->data.name));
 }
 
 /*
@@ -2070,7 +2085,7 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 			 * No need to care about TLIs here, only used during a single run,
 			 * so each LSN only maps to a specific WAL record.
 			 */
-			sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
+			sprintf(path, REORDERBUFFER_TXN_FILEPATH_FORMAT,
 					NameStr(MyReplicationSlot->data.name), txn->xid,
 					(uint32) (recptr >> 32), (uint32) recptr);
 
@@ -2316,7 +2331,7 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
 			 * No need to care about TLIs here, only used during a single run,
 			 * so each LSN only maps to a specific WAL record.
 			 */
-			sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
+			sprintf(path, REORDERBUFFER_TXN_FILEPATH_FORMAT,
 					NameStr(MyReplicationSlot->data.name), txn->xid,
 					(uint32) (recptr >> 32), (uint32) recptr);
 
@@ -2558,7 +2573,7 @@ ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
 
 		XLogSegNoOffsetToRecPtr(cur, 0, recptr, wal_segment_size);
 
-		sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
+		sprintf(path, REORDERBUFFER_TXN_FILEPATH_FORMAT,
 				NameStr(MyReplicationSlot->data.name), txn->xid,
 				(uint32) (recptr >> 32), (uint32) recptr);
 		if (unlink(path) != 0 && errno != ENOENT)
@@ -2568,6 +2583,44 @@ ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
 	}
 }
 
+/*
+ * Remove any leftover serialized reorder buffers from a slot directory after a
+ * prior crash or decoding session exit.
+ */
+static void
+ReorderBufferCleanSerializedTXNs(const char *slotname)
+{
+	DIR		   *spill_dir;
+	struct dirent *spill_de;
+	struct stat statbuf;
+	char		path[MAXPGPATH * 2 + 12];
+
+	sprintf(path, "pg_replslot/%s", slotname);
+
+	/* we're only handling directories here, skip if it's not ours */
+	if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
+		return;
+
+	spill_dir = AllocateDir(path);
+	while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL)
+	{
+		/* only look at names that can be ours */
+		if (strncmp(spill_de->d_name, "xid", 3) == 0)
+		{
+			snprintf(path, sizeof(path),
+					 "pg_replslot/%s/%s", slotname,
+					 spill_de->d_name);
+
+			if (unlink(path) != 0)
+				ereport(ERROR,
+						(errcode_for_file_access(),
+						 errmsg("could not remove file \"%s\" during removal of pg_replslot/%s/*.xid: %m",
+								path, slotname)));
+		}
+	}
+	FreeDir(spill_dir);
+}
+
 /*
  * Delete all data spilled to disk after we've restarted/crashed. It will be
  * recreated when the respective slots are reused.
@@ -2578,15 +2631,9 @@ StartupReorderBuffer(void)
 	DIR		   *logical_dir;
 	struct dirent *logical_de;
 
-	DIR		   *spill_dir;
-	struct dirent *spill_de;
-
 	logical_dir = AllocateDir("pg_replslot");
 	while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
 	{
-		struct stat statbuf;
-		char		path[MAXPGPATH * 2 + 12];
-
 		if (strcmp(logical_de->d_name, ".") == 0 ||
 			strcmp(logical_de->d_name, "..") == 0)
 			continue;
@@ -2599,33 +2646,7 @@ StartupReorderBuffer(void)
 		 * ok, has to be a surviving logical slot, iterate and delete
 		 * everything starting with xid-*
 		 */
-		sprintf(path, "pg_replslot/%s", logical_de->d_name);
-
-		/* we're only creating directories here, skip if it's not our's */
-		if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
-			continue;
-
-		spill_dir = AllocateDir(path);
-		while ((spill_de = ReadDir(spill_dir, path)) != NULL)
-		{
-			if (strcmp(spill_de->d_name, ".") == 0 ||
-				strcmp(spill_de->d_name, "..") == 0)
-				continue;
-
-			/* only look at names that can be ours */
-			if (strncmp(spill_de->d_name, "xid", 3) == 0)
-			{
-				sprintf(path, "pg_replslot/%s/%s", logical_de->d_name,
-						spill_de->d_name);
-
-				if (unlink(path) != 0)
-					ereport(PANIC,
-							(errcode_for_file_access(),
-							 errmsg("could not remove file \"%s\": %m",
-									path)));
-			}
-		}
-		FreeDir(spill_dir);
+		ReorderBufferCleanSerializedTXNs(logical_de->d_name);
 	}
 	FreeDir(logical_dir);
 }
-- 
2.14.3

Reply via email to