Hey. Currently synchronous_commit is by default disabled for logical apply worker on the
ground that reported flush_lsn includes only locally flushed data so slot
(publisher) preserves everything higher than this, and so in case of subscriber
restart no data is lost. However, imagine that subscriber is made highly
available by standby to which synchronous replication is enabled. Then reported flush_lsn is ignorant of this synchronous replication progress, and in case of
failover data loss may occur if subscriber managed to ack flush_lsn ahead of
syncrep. Moreover, it is almost silent due to this

    else if (start_lsn < slot->data.confirmed_flush)
    {
        /*
         * It might seem like we should error out in this case, but it's
         * pretty common for a client to acknowledge a LSN it doesn't have to
         * do anything for, and thus didn't store persistently, because the
         * xlog records didn't result in anything relevant for logical
         * decoding. Clients have to be able to do that to support synchronous
         * replication.
         *
         * Starting at a different LSN than requested might not catch certain
         * kinds of client errors; so the client may wish to check that
         * confirmed_flush_lsn matches its expectations.
         */
        elog(LOG, "%X/%X has been already streamed, forwarding to %X/%X",
             LSN_FORMAT_ARGS(start_lsn),
             LSN_FORMAT_ARGS(slot->data.confirmed_flush));

        start_lsn = slot->data.confirmed_flush;
    }


in logical.c

Attached draft patch fixes this by taking into account syncrep progress in

worker reporting.


-- cheers, arseny


diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 832b1cf764..9595748caa 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -183,6 +183,7 @@
 #include "replication/reorderbuffer.h"
 #include "replication/snapbuild.h"
 #include "replication/walreceiver.h"
+#include "replication/walsender_private.h"
 #include "replication/worker_internal.h"
 #include "rewrite/rewriteHandler.h"
 #include "storage/buffile.h"
@@ -3408,6 +3409,16 @@ get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
 	dlist_mutable_iter iter;
 	XLogRecPtr	local_flush = GetFlushRecPtr(NULL);
 
+	/*
+	 * If synchronous replication is configured, take into account its position.
+	 */
+	if (SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0')
+	{
+		LWLockAcquire(SyncRepLock, LW_SHARED);
+		local_flush = Min(local_flush, WalSndCtl->lsn[SYNC_REP_WAIT_FLUSH]);
+		LWLockRelease(SyncRepLock);
+	}
+
 	*write = InvalidXLogRecPtr;
 	*flush = InvalidXLogRecPtr;
 

Reply via email to