The first patch was incorrectly created on top of failover slots not HEAD.
Attached patch applies on HEAD.
From 87d839f8a2e78abb17fa985502fd5b66f0872b57 Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Wed, 16 Mar 2016 15:45:16 +0800
Subject: [PATCH 1/2] Correct incorrect claim that slots output a change
 "exactly once"

Replication slots may actually emit a change more than once
if the master crashes before flushing the slot.

See
http://www.postgresql.org/message-id/camsr+ygsatrgqpcx9qx4eocizwsa27xjkeipsotaje8ofix...@mail.gmail.com
for details.
---
 doc/src/sgml/logicaldecoding.sgml | 16 ++++++++++++----
 1 file changed, 12 insertions(+), 4 deletions(-)

diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index e841348..78e3dba 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -12,7 +12,6 @@
 
   <para>
    Changes are sent out in streams identified by logical replication slots.
-   Each stream outputs each change exactly once.
   </para>
 
   <para>
@@ -204,8 +203,7 @@ $ pg_recvlogical -d postgres --slot test --drop-slot
      In the context of logical replication, a slot represents a stream of
      changes that can be replayed to a client in the order they were made on
      the origin server. Each slot streams a sequence of changes from a single
-     database, sending each change exactly once (except when peeking forward
-     in the stream).
+     database.
     </para>
 
     <note>
@@ -218,7 +216,17 @@ $ pg_recvlogical -d postgres --slot test --drop-slot
     <para>
      A replication slot has an identifier that is unique across all databases
      in a <productname>PostgreSQL</productname> cluster. Slots persist
-     independently of the connection using them and are crash-safe.
+     independently of the connection using them. Slot creation and drop is
+     crash-safe, and slots will never be corrupted by a crash.
+    </para>
+
+    <para>
+     A logical slot outputs each database change at least once. A slot will
+     usually only emit a change once, but recently-sent changes may be sent
+     again if the server server crashes and restarts. Clients should remember
+     the last LSN they saw when decoding and skip over any repeated data or
+     (when using the replication protocol) request that decoding start from
+     that LSN rather than letting the server determine the start point.
     </para>
 
     <para>
-- 
2.1.0

From fdfe91482d7dd28920db67067d77388ef3871165 Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Wed, 16 Mar 2016 15:12:34 +0800
Subject: [PATCH 2/2] Dirty replication slots when confirm_lsn is changed

---
 src/backend/replication/logical/logical.c | 62 +++++++++++++++++++++----------
 1 file changed, 42 insertions(+), 20 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 2e6d3f9..40db6ff 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -437,6 +437,7 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
 	}
 
 	ctx->slot->data.confirmed_flush = ctx->reader->EndRecPtr;
+	ReplicationSlotMarkDirty();
 }
 
 /*
@@ -847,10 +848,15 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 	{
 		bool		updated_xmin = false;
 		bool		updated_restart = false;
+		bool		updated_confirm = false;
 
 		SpinLockAcquire(&MyReplicationSlot->mutex);
 
-		MyReplicationSlot->data.confirmed_flush = lsn;
+		if (MyReplicationSlot->data.confirmed_flush != lsn)
+		{
+			MyReplicationSlot->data.confirmed_flush = lsn;
+			updated_confirm = true;
+		}
 
 		/* if were past the location required for bumping xmin, do so */
 		if (MyReplicationSlot->candidate_xmin_lsn != InvalidXLogRecPtr &&
@@ -888,34 +894,50 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 
 		SpinLockRelease(&MyReplicationSlot->mutex);
 
-		/* first write new xmin to disk, so we know whats up after a crash */
-		if (updated_xmin || updated_restart)
+		if (updated_xmin || updated_restart || updated_confirm)
 		{
 			ReplicationSlotMarkDirty();
-			ReplicationSlotSave();
-			elog(DEBUG1, "updated xmin: %u restart: %u", updated_xmin, updated_restart);
-		}
 
-		/*
-		 * Now the new xmin is safely on disk, we can let the global value
-		 * advance. We do not take ProcArrayLock or similar since we only
-		 * advance xmin here and there's not much harm done by a concurrent
-		 * computation missing that.
-		 */
-		if (updated_xmin)
-		{
-			SpinLockAcquire(&MyReplicationSlot->mutex);
-			MyReplicationSlot->effective_catalog_xmin = MyReplicationSlot->data.catalog_xmin;
-			SpinLockRelease(&MyReplicationSlot->mutex);
+			/*
+			 * first write new xmin to disk, so we know whats up
+			 * after a crash.
+			 */
+			if (updated_xmin || updated_restart)
+			{
+				ReplicationSlotSave();
+				elog(DEBUG1, "updated xmin: %u restart: %u", updated_xmin, updated_restart);
+			}
 
-			ReplicationSlotsComputeRequiredXmin(false);
-			ReplicationSlotsComputeRequiredLSN();
+			/*
+			 * Now the new xmin is safely on disk, we can let the global value
+			 * advance. We do not take ProcArrayLock or similar since we only
+			 * advance xmin here and there's not much harm done by a concurrent
+			 * computation missing that.
+			 */
+			if (updated_xmin)
+			{
+				SpinLockAcquire(&MyReplicationSlot->mutex);
+				MyReplicationSlot->effective_catalog_xmin = MyReplicationSlot->data.catalog_xmin;
+				SpinLockRelease(&MyReplicationSlot->mutex);
+
+				ReplicationSlotsComputeRequiredXmin(false);
+				ReplicationSlotsComputeRequiredLSN();
+			}
 		}
 	}
 	else
 	{
+		bool dirtied = false;
+
 		SpinLockAcquire(&MyReplicationSlot->mutex);
-		MyReplicationSlot->data.confirmed_flush = lsn;
+		if (MyReplicationSlot->data.confirmed_flush != lsn)
+		{
+			MyReplicationSlot->data.confirmed_flush = lsn;
+			dirtied = true;
+		}
 		SpinLockRelease(&MyReplicationSlot->mutex);
+
+		if (dirtied)
+			ReplicationSlotMarkDirty();
 	}
 }
-- 
2.1.0

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to