The first patch was incorrectly created on top of failover slots not HEAD. Attached patch applies on HEAD.
From 87d839f8a2e78abb17fa985502fd5b66f0872b57 Mon Sep 17 00:00:00 2001 From: Craig Ringer <cr...@2ndquadrant.com> Date: Wed, 16 Mar 2016 15:45:16 +0800 Subject: [PATCH 1/2] Correct incorrect claim that slots output a change "exactly once"
Replication slots may actually emit a change more than once if the master crashes before flushing the slot. See http://www.postgresql.org/message-id/camsr+ygsatrgqpcx9qx4eocizwsa27xjkeipsotaje8ofix...@mail.gmail.com for details. --- doc/src/sgml/logicaldecoding.sgml | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index e841348..78e3dba 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -12,7 +12,6 @@ <para> Changes are sent out in streams identified by logical replication slots. - Each stream outputs each change exactly once. </para> <para> @@ -204,8 +203,7 @@ $ pg_recvlogical -d postgres --slot test --drop-slot In the context of logical replication, a slot represents a stream of changes that can be replayed to a client in the order they were made on the origin server. Each slot streams a sequence of changes from a single - database, sending each change exactly once (except when peeking forward - in the stream). + database. </para> <note> @@ -218,7 +216,17 @@ $ pg_recvlogical -d postgres --slot test --drop-slot <para> A replication slot has an identifier that is unique across all databases in a <productname>PostgreSQL</productname> cluster. Slots persist - independently of the connection using them and are crash-safe. + independently of the connection using them. Slot creation and drop is + crash-safe, and slots will never be corrupted by a crash. + </para> + + <para> + A logical slot outputs each database change at least once. A slot will + usually only emit a change once, but recently-sent changes may be sent + again if the server server crashes and restarts. Clients should remember + the last LSN they saw when decoding and skip over any repeated data or + (when using the replication protocol) request that decoding start from + that LSN rather than letting the server determine the start point. </para> <para> -- 2.1.0
From fdfe91482d7dd28920db67067d77388ef3871165 Mon Sep 17 00:00:00 2001 From: Craig Ringer <cr...@2ndquadrant.com> Date: Wed, 16 Mar 2016 15:12:34 +0800 Subject: [PATCH 2/2] Dirty replication slots when confirm_lsn is changed --- src/backend/replication/logical/logical.c | 62 +++++++++++++++++++++---------- 1 file changed, 42 insertions(+), 20 deletions(-) diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 2e6d3f9..40db6ff 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -437,6 +437,7 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx) } ctx->slot->data.confirmed_flush = ctx->reader->EndRecPtr; + ReplicationSlotMarkDirty(); } /* @@ -847,10 +848,15 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn) { bool updated_xmin = false; bool updated_restart = false; + bool updated_confirm = false; SpinLockAcquire(&MyReplicationSlot->mutex); - MyReplicationSlot->data.confirmed_flush = lsn; + if (MyReplicationSlot->data.confirmed_flush != lsn) + { + MyReplicationSlot->data.confirmed_flush = lsn; + updated_confirm = true; + } /* if were past the location required for bumping xmin, do so */ if (MyReplicationSlot->candidate_xmin_lsn != InvalidXLogRecPtr && @@ -888,34 +894,50 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn) SpinLockRelease(&MyReplicationSlot->mutex); - /* first write new xmin to disk, so we know whats up after a crash */ - if (updated_xmin || updated_restart) + if (updated_xmin || updated_restart || updated_confirm) { ReplicationSlotMarkDirty(); - ReplicationSlotSave(); - elog(DEBUG1, "updated xmin: %u restart: %u", updated_xmin, updated_restart); - } - /* - * Now the new xmin is safely on disk, we can let the global value - * advance. We do not take ProcArrayLock or similar since we only - * advance xmin here and there's not much harm done by a concurrent - * computation missing that. - */ - if (updated_xmin) - { - SpinLockAcquire(&MyReplicationSlot->mutex); - MyReplicationSlot->effective_catalog_xmin = MyReplicationSlot->data.catalog_xmin; - SpinLockRelease(&MyReplicationSlot->mutex); + /* + * first write new xmin to disk, so we know whats up + * after a crash. + */ + if (updated_xmin || updated_restart) + { + ReplicationSlotSave(); + elog(DEBUG1, "updated xmin: %u restart: %u", updated_xmin, updated_restart); + } - ReplicationSlotsComputeRequiredXmin(false); - ReplicationSlotsComputeRequiredLSN(); + /* + * Now the new xmin is safely on disk, we can let the global value + * advance. We do not take ProcArrayLock or similar since we only + * advance xmin here and there's not much harm done by a concurrent + * computation missing that. + */ + if (updated_xmin) + { + SpinLockAcquire(&MyReplicationSlot->mutex); + MyReplicationSlot->effective_catalog_xmin = MyReplicationSlot->data.catalog_xmin; + SpinLockRelease(&MyReplicationSlot->mutex); + + ReplicationSlotsComputeRequiredXmin(false); + ReplicationSlotsComputeRequiredLSN(); + } } } else { + bool dirtied = false; + SpinLockAcquire(&MyReplicationSlot->mutex); - MyReplicationSlot->data.confirmed_flush = lsn; + if (MyReplicationSlot->data.confirmed_flush != lsn) + { + MyReplicationSlot->data.confirmed_flush = lsn; + dirtied = true; + } SpinLockRelease(&MyReplicationSlot->mutex); + + if (dirtied) + ReplicationSlotMarkDirty(); } } -- 2.1.0
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers