Hi all I've found a minor off-by-one error in the resource retention logic for logical slots, where we treat confirmed_flush as meaning "flushed up to and including this LSN". Seems reasonable, but the rest of the code treats it as "flushed up to but excluding this LSN". In particular, we treat confirmed_flush as the inclusive startpoint for a new logical decoding session if no startpoint is provided by the client and will replay a change whose commit record begins at exactly confirmed_flush to the client.
This issue was identified while debugging an issue where duplicate rows were replicated after unclean shutdown by logical replication on a table with no PK. I'd prefer to make confirmed_flush mean "confirmed flushed up to and including" everywhere, but the knock-on effects are too ugly. In particular we'd then be changing the meaning of START_REPLICATION ... LOGICAL ... 's argument LSN to be "start replay of commits after, but not including, this LSN". We can't just adjust it by 1, otherwise suddenly we'd get different results if we passed the confirmed_flush value explicitly vs passing 0/0 and letting it be picked up implicitly. Two further minor patches follow, one to fix a harmless but confusing quirk in logical walsender init, and one that adds explanatory comments to relevant parts of the code (and a couple of spots in the docs) to avoid future occurrences of the above issues. -- Craig Ringer http://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Training & Services
From f3a3f0d088576e52534f13fc3cd3c23b409d8a9f Mon Sep 17 00:00:00 2001 From: Craig Ringer <cr...@2ndquadrant.com> Date: Thu, 9 Mar 2017 10:04:20 +0800 Subject: [PATCH 1/3] Fix off-by-one in logical slot resource retention START_REPLICATION ... LOGICAL, the snapshot builder's SnapBuildXactNeedsSkip(...), etc treat a LSN that exactly equals the start of a commit record as being inclusive, i.e. we should process that commit and replay it to the client. If the client does not specify a commit, the start value used is the replication slot's confirmed_flush lsn, so "confirmed flush" really means "confirmed flushed up to and not including" - we're replayng the transaction so we're assuming the client does not already have it. This is consistent with usage elsewhere, such as walsender's sentPtr meaning "sent up to but not including this point". However, LogicalIncreaseXminForSlot(...) and LogicalIncreaseRestartDecodingForSlot(...) treat confirmed_flush as meaning "client has flushed everything up to and including this LSN" so they may remove resources needed by a commit that started at exactly confirmed_flush. This isn't a big issue in practice. Typically a walsender-using client will promptly send standby status updates that advance confirmed_flush past the start-of-commit LSN. The SQL interface advances the replication identifier to the LSN of end-of-commit + 1, so it is unaffected. --- src/backend/replication/logical/logical.c | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 5529ac8..7e03f33 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -778,8 +778,11 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin) * If the client has already confirmed up to this lsn, we directly can * mark this as accepted. This can happen if we restart decoding in a * slot. + * + * confirmed_flush actually points to the first unconfirmed LSN, so we must + * not remove resources exactly at confirmed_flush. */ - else if (current_lsn <= slot->data.confirmed_flush) + else if (current_lsn < slot->data.confirmed_flush) { slot->candidate_catalog_xmin = xmin; slot->candidate_xmin_lsn = current_lsn; @@ -833,8 +836,12 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart /* * We might have already flushed far enough to directly accept this lsn, * in this case there is no need to check for existing candidate LSNs + * + * The < is intentional, confirmed_flush points to the LSN immediately + * after the point confirmed on-disk by the client so we mustn't remove + * resources for commits exactly at confirmed_flush. */ - else if (current_lsn <= slot->data.confirmed_flush) + else if (current_lsn < slot->data.confirmed_flush) { slot->candidate_restart_valid = current_lsn; slot->candidate_restart_lsn = restart_lsn; -- 2.5.5
From a06928b00c2a9dfa0863208196ead8c0eec87afe Mon Sep 17 00:00:00 2001 From: Craig Ringer <cr...@2ndquadrant.com> Date: Thu, 9 Mar 2017 11:10:01 +0800 Subject: [PATCH 2/3] Start sendpos for logical walsender from restart_lsn In logical streaming mode the walsender would initialize its internal sentPos to the slot's confirmed_flush position, and the sentPos exposed to shmem to the slot's restart_lsn. Subseqent updates to both would use the last-decoded WAL record. Since we overwrite the internal sentPos before using it, this is clearly an oversight from an earlier iteration of the logical decoding patch set. Initialize both to restart_lsn instead. --- src/backend/replication/walsender.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index dd3a936..ec5d9db 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -942,14 +942,14 @@ StartLogicalReplication(StartReplicationCmd *cmd) * Report the location after which we'll send out further commits as the * current sentPtr. */ - sentPtr = MyReplicationSlot->data.confirmed_flush; + sentPtr = MyReplicationSlot->data.restart_lsn; /* Also update the sent position status in shared memory */ { WalSnd *walsnd = MyWalSnd; SpinLockAcquire(&walsnd->mutex); - walsnd->sentPtr = MyReplicationSlot->data.restart_lsn; + walsnd->sentPtr = sentPtr; SpinLockRelease(&walsnd->mutex); } -- 2.5.5
From 8a20341eaf165252c685d92c01718222923c4799 Mon Sep 17 00:00:00 2001 From: Craig Ringer <cr...@2ndquadrant.com> Date: Thu, 9 Mar 2017 10:53:55 +0800 Subject: [PATCH 3/3] Documentation and comments fixes relating to replication position tracking There is some confusing code in the walsender and in logical decoding that treats a "sentPos" variable as "sent up to but not including", and treats the "confirmed_flush" value in a replication slot as "confirmed up to but not including". The latter has already resulted in an off-by-one error where confirmed_flush was assumed to mean "confirmed flushed up and including" in some places. Additionally, some log message output for logical decoding stated that commits "after" the specified LSN would be replayed, when in fact it is commits beginning at or after the specified LSN that will be replayed. Improve comments, messages and documentation to emphasise that sentPtr and confirmed_flush are "up to and excluding", that the pg_stat_replication fields are the first un-sent/un-flushed/un-committed LSN, that replication functions start at the LSN specified inclusive, etc. Additionally, document the restart_lsn and confirmed_flush lsn in struct ReplicationSlotPersistentData. --- doc/src/sgml/func.sgml | 8 +++--- doc/src/sgml/logicaldecoding.sgml | 3 ++- doc/src/sgml/monitoring.sgml | 15 +++++++---- doc/src/sgml/protocol.sgml | 20 +++++++++----- src/backend/replication/logical/logical.c | 21 ++++++++++----- src/backend/replication/walsender.c | 42 +++++++++++++++++++++++------ src/include/replication/reorderbuffer.h | 4 +-- src/include/replication/slot.h | 25 +++++++++++++---- src/include/replication/walsender_private.h | 2 +- 9 files changed, 103 insertions(+), 37 deletions(-) diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml index 583b3b2..f6f74cd 100644 --- a/doc/src/sgml/func.sgml +++ b/doc/src/sgml/func.sgml @@ -18815,9 +18815,11 @@ postgres=# SELECT * FROM pg_walfile_name_offset(pg_stop_backup()); (<parameter>location</parameter> <type>pg_lsn</type>, <parameter>xid</parameter> <type>xid</type>, <parameter>data</parameter> <type>text</type>) </entry> <entry> - Returns changes in the slot <parameter>slot_name</parameter>, starting - from the point at which since changes have been consumed last. If - <parameter>upto_lsn</> and <parameter>upto_nchanges</> are NULL, + Returns changes in the <link + linkend="logicaldecoding-replication-slots">logical slot</> + <parameter>slot_name</parameter>, starting from the point at which + since changes have been consumed last. If <parameter>upto_lsn</> and + <parameter>upto_nchanges</> are NULL, logical decoding will continue until end of WAL. If <parameter>upto_lsn</> is non-NULL, decoding will include only those transactions which commit prior to the specified LSN. If diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index 03c2c69..30874ce 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -11,7 +11,8 @@ </para> <para> - Changes are sent out in streams identified by logical replication slots. + Changes are sent out in streams identified by <link + linkend="logicaldecoding-replication-slots"> logical replication slots.</> </para> <para> diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 4d03531..7f33f91 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -1404,24 +1404,29 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i <row> <entry><structfield>sent_location</></entry> <entry><type>pg_lsn</></entry> - <entry>Last transaction log position sent on this connection</entry> + <entry> + Last transaction log position + 1 sent on this connection, or, for + logical decoding sessions, the log position of the last record processed + and buffered for sending at commit-time. + </entry> </row> <row> <entry><structfield>write_location</></entry> <entry><type>pg_lsn</></entry> - <entry>Last transaction log position written to disk by this standby - server</entry> + <entry> + Last transaction log position + 1 written to disk by this standby + server, i.e the start of the first record not written.</entry> </row> <row> <entry><structfield>flush_location</></entry> <entry><type>pg_lsn</></entry> - <entry>Last transaction log position flushed to disk by this standby + <entry>Last transaction log position + 1 flushed to disk by this standby server</entry> </row> <row> <entry><structfield>replay_location</></entry> <entry><type>pg_lsn</></entry> - <entry>Last transaction log position replayed into the database on this + <entry>Last transaction log position + 1 replayed into the database on this standby server</entry> </row> <row> diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 3d6e8ee..f8233df 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -1925,11 +1925,13 @@ The commands accepted in walsender mode are: <term><literal>START_REPLICATION</literal> <literal>SLOT</literal> <replaceable class="parameter">slot_name</> <literal>LOGICAL</literal> <replaceable class="parameter">XXX/XXX</> [ ( <replaceable>option_name</replaceable> [ <replaceable>option_value</replaceable> ] [, ...] ) ]</term> <listitem> <para> - Instructs server to start streaming WAL for logical replication, starting - at WAL position <replaceable class="parameter">XXX/XXX</>. The server can - reply with an error, for example if the requested section of WAL has already - been recycled. On success, server responds with a CopyBothResponse - message, and then starts to stream WAL to the frontend. + Instructs server to start streaming WAL for <link + linkend="logicaldecoding">logical decoding</>, starting at the first + commit with starting WAL position equal to or greater than <replaceable + class="parameter">XXX/XXX</>. The server can reply with an error, for + example if the requested section of WAL has already been recycled. On + success, server responds with a <literal>CopyBothResponse</> message, and + then starts to stream WAL to the frontend. </para> <para> @@ -1958,7 +1960,13 @@ The commands accepted in walsender mode are: <term><replaceable class="parameter">XXX/XXX</></term> <listitem> <para> - The WAL position to begin streaming at. + The WAL position to begin streaming commits at. Inclusive; a commit record + beginning at exactly XXX/XXX will be streamed to the client. + </para> + <para> + If the requested WAL position is less than the <literal>confirmed_flush_lsn</> + for <replaceable>SLOT</> it will be ignored and the confirmed flush position + will be used as the start point. </para> </listitem> </varlistentry> diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 7e03f33..68193de 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -304,10 +304,13 @@ CreateInitDecodingContext(char *plugin, * used already. * * start_lsn - * The LSN at which to start decoding. If InvalidXLogRecPtr, restart - * from the slot's confirmed_flush; otherwise, start from the specified - * location (but move it forwards to confirmed_flush if it's older than - * that, see below). + * The LSN at which to start sending commits to the output plugin. If + * InvalidXLogRecPtr, restart from the slot's confirmed_flush; otherwise, + * start from the specified location (but move it forwards to + * confirmed_flush if it's older than that, see below). + * + * start_lsn is inclusive, so a commit beginning exactly at start_lsn + * will be sent to the client. * * output_plugin_options * contains options passed to the output plugin. @@ -320,6 +323,10 @@ CreateInitDecodingContext(char *plugin, * as the decoding context because further memory contexts will be created * inside it. * + * WAL reading and logical decoding always starts at restart_lsn and is not + * controlled by start_lsn. That argument only controls which decoded commits + * are sent to the client. + * * Returns an initialized decoding context after calling the output plugin's * startup function. */ @@ -389,7 +396,7 @@ CreateDecodingContext(XLogRecPtr start_lsn, ereport(LOG, (errmsg("starting logical decoding for slot \"%s\"", NameStr(slot->data.name)), - errdetail("streaming transactions committing after %X/%X, reading WAL from %X/%X", + errdetail("streaming transactions committing at or after %X/%X, reading WAL from %X/%X", (uint32) (slot->data.confirmed_flush >> 32), (uint32) slot->data.confirmed_flush, (uint32) (slot->data.restart_lsn >> 32), @@ -446,6 +453,7 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx) CHECK_FOR_INTERRUPTS(); } + /* Start output to client for commits after end of last record */ ctx->slot->data.confirmed_flush = ctx->reader->EndRecPtr; } @@ -885,7 +893,8 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart } /* - * Handle a consumer's confirmation having received all changes up to lsn. + * Handle a consumer's confirmation having received all changes up to (but not + * including) lsn. */ void LogicalConfirmReceivedLocation(XLogRecPtr lsn) diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index ec5d9db..35a0e7c 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -142,8 +142,10 @@ static bool sendTimeLineIsHistoric = false; static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr; /* - * How far have we sent WAL already? This is also advertised in - * MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.) + * Position up to, but not including, which we have sent WAL already. + * The next request will start from this position. + * + * Also advertised in MyWalSnd->sentPtr. */ static XLogRecPtr sentPtr = 0; @@ -888,6 +890,8 @@ DropReplicationSlot(DropReplicationSlotCmd *cmd) /* * Load previously initiated logical slot and prepare for sending data (via * WalSndLoop). + * + * Handles START_REPLICATION ... LOGICAL ... */ static void StartLogicalReplication(StartReplicationCmd *cmd) @@ -927,20 +931,28 @@ StartLogicalReplication(StartReplicationCmd *cmd) sendTimeLine = ThisTimeLineID; /* - * Initialize position to the last ack'ed one, then the xlog records begin - * to be shipped from that position. + * Let logical decoding decide where to start reading WAL and where to + * start sending commits to the client, giving it the client-supplied start + * point so it can skip over any unwanted commits the client has already + * processed. */ logical_decoding_ctx = CreateDecodingContext( cmd->startpoint, cmd->options, logical_read_xlog_page, WalSndPrepareWrite, WalSndWriteData); - /* Start reading WAL from the oldest required WAL. */ + /* + * Start reading WAL from the oldest required WAL. + * + * This is just a parameter to XLogSendLogical passed via a global. + */ logical_startptr = MyReplicationSlot->data.restart_lsn; /* - * Report the location after which we'll send out further commits as the - * current sentPtr. + * Report the location we start processing WAL from as the "sent" location, + * even though it will generally be well behind cmd->startpoint. + * + * See XLogSendLogical for rationale. */ sentPtr = MyReplicationSlot->data.restart_lsn; @@ -2388,7 +2400,7 @@ XLogSendLogical(void) WalSndCaughtUp = false; record = XLogReadRecord(logical_decoding_ctx->reader, logical_startptr, &errm); - logical_startptr = InvalidXLogRecPtr; + logical_startptr = InvalidXLogRecPtr; /* no longer used */ /* xlog record was invalid */ if (errm != NULL) @@ -2398,6 +2410,20 @@ XLogSendLogical(void) { LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader); + /* + * Report the "sent" pointer as the LSN after the end of the most + * recent xlog record we've processed in logical decoding. This is + * somewhat misleading since we have "sent" the record to logical + * decoding, but not yet to the output plugin or the client its self. + * + * Due to reorder buffer processing we can't really report any other + * measure of progress in terms of an LSN, though. If we reported + * the LSN of the last row change during reorder buffer commit + * processing the LSNs would go backwards whenever we started + * processing the next commit (if they were running concurrently), + * and we'd have nothing to report when we weren't processing a + * commit since we're just buffering. + */ sentPtr = logical_decoding_ctx->reader->EndRecPtr; } else diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 17e47b3..4b2f73c 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -160,8 +160,8 @@ typedef struct ReorderBufferTXN XLogRecPtr first_lsn; /* ---- - * LSN of the record that lead to this xact to be committed or - * aborted. This can be a + * LSN of the beginning of the record that lead to this xact to be + * committed or aborted. This can be a * * plain commit record * * plain commit record, of a parent transaction * * prepared transaction commit diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 62cacdb..81b74e2 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -64,14 +64,29 @@ typedef struct ReplicationSlotPersistentData */ TransactionId catalog_xmin; - /* oldest LSN that might be required by this replication slot */ + /* + * Oldest LSN that might be required by this replication slot. + * + * For logical decoding this points to the most recent xl_running_xacts + * record prior to the xid allocation (BEGIN) of the oldest xact the client + * has not yet confirmed replay of. WAL will be re-read from this LSN and + * needed changes and invalidations will be assembled into reorder buffers. + */ XLogRecPtr restart_lsn; /* - * Oldest LSN that the client has acked receipt for. This is used as the - * start_lsn point in case the client doesn't specify one, and also as a - * safety measure to jump forwards in case the client specifies a - * start_lsn that's further in the past than this value. + * The client has acked all records up to but not including confirmed_flush + * as safely flushed to client storage. + * + * This is used as the point at which logical decoding begins sending + * changes to the client if the client doesn't specify one. It also serves + * as a safety measure to (silently) jump forwards in case the client + * specifies a start_lsn that's further in the past than this value. + * + * Logical decoding may only invoke the output plugin for changes where the + * start of the commit record is equal to or greater than to this LSN. + * catalog_xmin may have been advanced so that needed catalogs for any + * earlier commits have been vacuumed away. */ XLogRecPtr confirmed_flush; diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index 5e6ccfc..2db0b2a 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -34,7 +34,7 @@ typedef struct WalSnd { pid_t pid; /* this walsender's process id, or 0 */ WalSndState state; /* this walsender's state */ - XLogRecPtr sentPtr; /* WAL has been sent up to this point */ + XLogRecPtr sentPtr; /* WAL has been sent up to (but not including) this point */ bool needreload; /* does currently-open file need to be * reloaded? */ -- 2.5.5
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers