Hi all TL;DR: we should delete pg_replslot/$SLOTNAME/* at the start of each decoding session or we can accidentally re-use stale reorder buffer contents from prior runs and $BADNESS happens. StartupReorderBuffer() is not sufficient.
Details: Petr and I have found a bug in logical decoding where changes get appended multiple times to serialized reorder buffers. This causes duplicate changes sent to downstream (conflicts, ERRORs, etc). Symptoms are: * Oversized serialized reorder buffers in pg_replslot. In the case we found this problem in, there was a 147GB reorder buffer that should only have been 12GB. * Downstream/receiver working hard but achieving nothing (pglogical/bdr with conflict resolution enabled), or failing with unique violations and other errors (built-in logical replication) Debugging showed that logical decoding was calling the output plugin many times with the same set of ReorderBufferChange records. It'd advance normally, then go back to the start of a page address or similar and go through them all all over again. Log analysis showed that the downstream had been repeatedly connecting to the upstream, then ERRORing out, so the upstream logs were full of LOG: could not receive data from client: Connection reset by peer LOG: unexpected EOF on standby connection When the downstream error was fixed, the repeated changes were seen. The cause appears to be that walsender.c's ProcessRepliesIfAny writes a LOG for unexpected EOF then calls proc_exit(0). But serialized txn cleanup is done by ReorderBufferRestoreCleanup, as called by ReorderBufferCleanupTXN, which is invoked from the PG_CATCH() in ReorderBufferCommit() and on various normal exits. It won't get called if we proc_exit() without an ERROR, so we leave stale data lying around. It's not a problem on crash restart because StartupReorderBuffer already does the required delete. ReorderBufferSerializeTXN, which spills the txns to disk, doesn't appear to have any guard to ensure that the segment files don't already exist when it goes to serialize a snapshot. Adding one there would probably be expensive; we don't know the last lsn of the txn yet, so to be really safe we'd have to do a directory listing and scan for any txn-$OURXID-* entries. So to fix, I suggest that we should do a slot-specific StartupReorderBuffer-style deletion when we start a new decoding session on a slot, per attached patch. It might be nice to also add a hook on proc exit, so we don't have stale buffers lying around until next decoding session, but I didn't want to add new global state to reorderbuffer.c so I've left that for now. BTW, while this bug looks similar to https://www.postgresql.org/message-id/54e4e488-186b-a056-6628-50628e4e4...@lab.ntt.co.jp "Failed to delete old ReorderBuffer spilled files" it's really quite a separate issue. Both this bugfix and the above need backpatching to 9.4. -- Craig Ringer http://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Training & Services
From 0a9b98a857e8de02394dc8aefe365a72e50a222e Mon Sep 17 00:00:00 2001 From: Craig Ringer <cr...@2ndquadrant.com> Date: Tue, 5 Dec 2017 13:32:45 +0800 Subject: [PATCH v1] Clean up reorder buffer files when starting logical decoding We could fail to clean up reorder buffer files if the walsender exited due to a client disconnect, because we'd skip both the normal exit and error paths. --- src/backend/replication/logical/reorderbuffer.c | 84 +++++++++++++++---------- 1 file changed, 50 insertions(+), 34 deletions(-) diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index fa95bab..bc562e2 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -196,6 +196,7 @@ static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *change); static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn); +static void ReorderBufferCleanSerializedTXNs(const char *slotname); static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap); static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, @@ -214,7 +215,8 @@ static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *t /* - * Allocate a new ReorderBuffer + * Allocate a new ReorderBuffer and clean out any old serialized state from + * prior ReorderBuffer instances for the same slot. */ ReorderBuffer * ReorderBufferAllocate(void) @@ -223,6 +225,8 @@ ReorderBufferAllocate(void) HASHCTL hash_ctl; MemoryContext new_ctx; + Assert(MyReplicationSlot != NULL); + /* allocate memory in own context, to have better accountability */ new_ctx = AllocSetContextCreate(CurrentMemoryContext, "ReorderBuffer", @@ -266,6 +270,9 @@ ReorderBufferAllocate(void) dlist_init(&buffer->toplevel_by_lsn); + /* Ensure there's no stale data from prior uses of this slot */ + ReorderBufferCleanSerializedTXNs(NameStr(MyReplicationSlot->data.name)); + return buffer; } @@ -2551,6 +2558,47 @@ ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn) } /* + * Remove any leftover serialized reorder buffers from a slot directory after a + * prior crash or decoding session exit. + */ +static void +ReorderBufferCleanSerializedTXNs(const char *slotname) +{ + DIR *spill_dir; + struct dirent *spill_de; + struct stat statbuf; + char path[MAXPGPATH * 2 + 12]; + + sprintf(path, "pg_replslot/%s", slotname); + + /* we're only handling directories here, skip if it's not our's */ + if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode)) + return; + + spill_dir = AllocateDir(path); + while ((spill_de = ReadDir(spill_dir, path)) != NULL) + { + if (strcmp(spill_de->d_name, ".") == 0 || + strcmp(spill_de->d_name, "..") == 0) + continue; + + /* only look at names that can be ours */ + if (strncmp(spill_de->d_name, "xid", 3) == 0) + { + sprintf(path, "pg_replslot/%s/%s", slotname, + spill_de->d_name); + + if (unlink(path) != 0) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not remove file \"%s\": %m", + path))); + } + } + FreeDir(spill_dir); +} + +/* * Delete all data spilled to disk after we've restarted/crashed. It will be * recreated when the respective slots are reused. */ @@ -2560,15 +2608,9 @@ StartupReorderBuffer(void) DIR *logical_dir; struct dirent *logical_de; - DIR *spill_dir; - struct dirent *spill_de; - logical_dir = AllocateDir("pg_replslot"); while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL) { - struct stat statbuf; - char path[MAXPGPATH * 2 + 12]; - if (strcmp(logical_de->d_name, ".") == 0 || strcmp(logical_de->d_name, "..") == 0) continue; @@ -2581,33 +2623,7 @@ StartupReorderBuffer(void) * ok, has to be a surviving logical slot, iterate and delete * everything starting with xid-* */ - sprintf(path, "pg_replslot/%s", logical_de->d_name); - - /* we're only creating directories here, skip if it's not our's */ - if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode)) - continue; - - spill_dir = AllocateDir(path); - while ((spill_de = ReadDir(spill_dir, path)) != NULL) - { - if (strcmp(spill_de->d_name, ".") == 0 || - strcmp(spill_de->d_name, "..") == 0) - continue; - - /* only look at names that can be ours */ - if (strncmp(spill_de->d_name, "xid", 3) == 0) - { - sprintf(path, "pg_replslot/%s/%s", logical_de->d_name, - spill_de->d_name); - - if (unlink(path) != 0) - ereport(PANIC, - (errcode_for_file_access(), - errmsg("could not remove file \"%s\": %m", - path))); - } - } - FreeDir(spill_dir); + ReorderBufferCleanSerializedTXNs(logical_de->d_name); } FreeDir(logical_dir); } -- 2.9.5