Hi all

TL;DR: we should delete pg_replslot/$SLOTNAME/* at the start of each
decoding session or we can accidentally re-use stale reorder buffer
contents from prior runs and $BADNESS happens. StartupReorderBuffer() is
not sufficient.


Details:

Petr and I have found a bug in logical decoding where changes get appended
multiple times to serialized reorder buffers. This causes duplicate changes
sent to downstream (conflicts, ERRORs, etc).

Symptoms are:

* Oversized serialized reorder buffers in pg_replslot. In the case we found
this problem in, there was a 147GB reorder buffer that should only have
been 12GB.

* Downstream/receiver working hard but achieving nothing (pglogical/bdr
with conflict resolution enabled), or failing with unique violations and
other errors (built-in logical replication)

Debugging showed that logical decoding was calling the output plugin many
times with the same set of ReorderBufferChange records. It'd advance
normally, then go back to the start of a page address or similar and go
through them all all over again.

Log analysis showed that the downstream had been repeatedly connecting to
the upstream, then ERRORing out, so the upstream logs were full of

LOG: could not receive data from client: Connection reset by peer
LOG: unexpected EOF on standby connection

When the downstream error was fixed, the repeated changes were seen.

The cause appears to be that walsender.c's ProcessRepliesIfAny writes a LOG
for unexpected EOF then calls proc_exit(0). But  serialized txn cleanup is
done by
ReorderBufferRestoreCleanup, as called by ReorderBufferCleanupTXN, which is
invoked from the PG_CATCH() in ReorderBufferCommit() and on various normal
exits. It won't get called if we proc_exit() without an ERROR, so we leave
stale data lying around.

It's not a problem on crash restart because StartupReorderBuffer already
does the required delete.

ReorderBufferSerializeTXN, which spills the txns to disk, doesn't appear to
have any guard to ensure that the segment files don't already exist when it
goes to serialize a snapshot. Adding one there would probably be expensive;
we don't know the last lsn of the txn yet, so to be really safe we'd have
to do a directory listing and scan for any txn-$OURXID-* entries.

So to fix, I suggest that we should do a
slot-specific StartupReorderBuffer-style deletion when we start a new
decoding session on a slot, per attached patch.

It might be nice to also add a hook on proc exit, so we don't have stale
buffers lying around until next decoding session, but I didn't want to add
new global state to reorderbuffer.c so I've left that for now.

BTW, while this bug looks similar to
https://www.postgresql.org/message-id/54e4e488-186b-a056-6628-50628e4e4...@lab.ntt.co.jp
"Failed to delete old ReorderBuffer spilled files" it's really quite a
separate issue.

Both this bugfix and the above need backpatching to 9.4.

-- 
 Craig Ringer                   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services
From 0a9b98a857e8de02394dc8aefe365a72e50a222e Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Tue, 5 Dec 2017 13:32:45 +0800
Subject: [PATCH v1] Clean up reorder buffer files when starting logical
 decoding

We could fail to clean up reorder buffer files if the walsender exited due to a
client disconnect, because we'd skip both the normal exit and error paths.
---
 src/backend/replication/logical/reorderbuffer.c | 84 +++++++++++++++----------
 1 file changed, 50 insertions(+), 34 deletions(-)

diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index fa95bab..bc562e2 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -196,6 +196,7 @@ static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn
 static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 						   char *change);
 static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
+static void ReorderBufferCleanSerializedTXNs(const char *slotname);
 
 static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap);
 static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
@@ -214,7 +215,8 @@ static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *t
 
 
 /*
- * Allocate a new ReorderBuffer
+ * Allocate a new ReorderBuffer and clean out any old serialized state from
+ * prior ReorderBuffer instances for the same slot.
  */
 ReorderBuffer *
 ReorderBufferAllocate(void)
@@ -223,6 +225,8 @@ ReorderBufferAllocate(void)
 	HASHCTL		hash_ctl;
 	MemoryContext new_ctx;
 
+	Assert(MyReplicationSlot != NULL);
+
 	/* allocate memory in own context, to have better accountability */
 	new_ctx = AllocSetContextCreate(CurrentMemoryContext,
 									"ReorderBuffer",
@@ -266,6 +270,9 @@ ReorderBufferAllocate(void)
 
 	dlist_init(&buffer->toplevel_by_lsn);
 
+	/* Ensure there's no stale data from prior uses of this slot */
+	ReorderBufferCleanSerializedTXNs(NameStr(MyReplicationSlot->data.name));
+
 	return buffer;
 }
 
@@ -2551,6 +2558,47 @@ ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
 }
 
 /*
+ * Remove any leftover serialized reorder buffers from a slot directory after a
+ * prior crash or decoding session exit.
+ */
+static void
+ReorderBufferCleanSerializedTXNs(const char *slotname)
+{
+	DIR		   *spill_dir;
+	struct dirent *spill_de;
+	struct stat statbuf;
+	char		path[MAXPGPATH * 2 + 12];
+
+	sprintf(path, "pg_replslot/%s", slotname);
+
+	/* we're only handling directories here, skip if it's not our's */
+	if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
+		return;
+
+	spill_dir = AllocateDir(path);
+	while ((spill_de = ReadDir(spill_dir, path)) != NULL)
+	{
+		if (strcmp(spill_de->d_name, ".") == 0 ||
+			strcmp(spill_de->d_name, "..") == 0)
+			continue;
+
+		/* only look at names that can be ours */
+		if (strncmp(spill_de->d_name, "xid", 3) == 0)
+		{
+			sprintf(path, "pg_replslot/%s/%s", slotname,
+					spill_de->d_name);
+
+			if (unlink(path) != 0)
+				ereport(PANIC,
+						(errcode_for_file_access(),
+						 errmsg("could not remove file \"%s\": %m",
+								path)));
+		}
+	}
+	FreeDir(spill_dir);
+}
+
+/*
  * Delete all data spilled to disk after we've restarted/crashed. It will be
  * recreated when the respective slots are reused.
  */
@@ -2560,15 +2608,9 @@ StartupReorderBuffer(void)
 	DIR		   *logical_dir;
 	struct dirent *logical_de;
 
-	DIR		   *spill_dir;
-	struct dirent *spill_de;
-
 	logical_dir = AllocateDir("pg_replslot");
 	while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
 	{
-		struct stat statbuf;
-		char		path[MAXPGPATH * 2 + 12];
-
 		if (strcmp(logical_de->d_name, ".") == 0 ||
 			strcmp(logical_de->d_name, "..") == 0)
 			continue;
@@ -2581,33 +2623,7 @@ StartupReorderBuffer(void)
 		 * ok, has to be a surviving logical slot, iterate and delete
 		 * everything starting with xid-*
 		 */
-		sprintf(path, "pg_replslot/%s", logical_de->d_name);
-
-		/* we're only creating directories here, skip if it's not our's */
-		if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
-			continue;
-
-		spill_dir = AllocateDir(path);
-		while ((spill_de = ReadDir(spill_dir, path)) != NULL)
-		{
-			if (strcmp(spill_de->d_name, ".") == 0 ||
-				strcmp(spill_de->d_name, "..") == 0)
-				continue;
-
-			/* only look at names that can be ours */
-			if (strncmp(spill_de->d_name, "xid", 3) == 0)
-			{
-				sprintf(path, "pg_replslot/%s/%s", logical_de->d_name,
-						spill_de->d_name);
-
-				if (unlink(path) != 0)
-					ereport(PANIC,
-							(errcode_for_file_access(),
-							 errmsg("could not remove file \"%s\": %m",
-									path)));
-			}
-		}
-		FreeDir(spill_dir);
+		ReorderBufferCleanSerializedTXNs(logical_de->d_name);
 	}
 	FreeDir(logical_dir);
 }
-- 
2.9.5

Reply via email to