On 2018-Jul-12, Michael Paquier wrote:

> On Wed, Jul 04, 2018 at 10:50:28AM +0900, Michael Paquier wrote:
> > On Tue, Jul 03, 2018 at 01:17:48AM -0400, Alvaro Herrera wrote:
> > > Let me review tomorrow.
> > 
> > Of course, please feel free.
> 
> Alvaro, are you planning to look at that to close the loop?  The latest
> version is here:
> https://postgr.es/m/20180709070200.gc30...@paquier.xyz

In the immortal words of Julian Bream: "yeah, I didn't like any of that".

I started thinking that the "while we could do X, we better not because
Y" new wording in the comment was misleading -- the comment is precisely
to convey that we must NOT do X, so why say "we could"?  I reworded that
comment a few times until it made sense.  Then I noticed the other
comments were either misplaced or slightly misleading, so I moved them
to their proper places, then reworded them thoroughly.

I also moved some assignments from the declaration section to the code
section, so that I could attach proper comments to each, to improve
clarity of *why* we do those things.

I then noticed that we get a XLogRecord from XLogReadRecord, but then
fail to do anything with it, so I changed the code to use a bool
instead, which I think is clearer.

I think the proposed comment before the LogicalDecodingProcessRecord
call failed to convey the important ideas, so I rewrote that one also.

There is no struct member called confirmed_flush_lsn anywhere.

The tense of some words in CreateDecodingContext was wrong.

I also back-patched two minor changes from Tom's 3cb646264e8c.

BTW I think I'm starting to have a vague idea of logical decoding now.

-- 
Álvaro Herrera                https://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
>From 6abc059aaddcdc0e368a9e2c8dec49a09819cbef Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Wed, 18 Jul 2018 11:32:55 -0400
Subject: [PATCH] replslot advance comment updates

---
 src/backend/replication/logical/logical.c |  5 ++-
 src/backend/replication/slotfuncs.c       | 75 ++++++++++++++++++++-----------
 2 files changed, 52 insertions(+), 28 deletions(-)

diff --git a/src/backend/replication/logical/logical.c 
b/src/backend/replication/logical/logical.c
index 61588d626f..c9bbdcda74 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -338,7 +338,10 @@ CreateInitDecodingContext(char *plugin,
  *             that, see below).
  *
  * output_plugin_options
- *             contains options passed to the output plugin.
+ *             options passed to the output plugin.
+ *
+ * fast_forward
+ *             bypass the generation of logical changes.
  *
  * read_page, prepare_write, do_write, update_progress
  *             callbacks that have to be filled to perform the use-case 
dependent,
diff --git a/src/backend/replication/slotfuncs.c 
b/src/backend/replication/slotfuncs.c
index 23af32355b..08d20a9470 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -318,9 +318,10 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 
 /*
  * Helper function for advancing physical replication slot forward.
- * The LSN position to move to is compared simply to the slot's
- * restart_lsn, knowing that any position older than that would be
- * removed by successive checkpoints.
+ *
+ * The LSN position to move to is compared simply to the slot's restart_lsn,
+ * knowing that any position older than that would be removed by successive
+ * checkpoints.
  */
 static XLogRecPtr
 pg_physical_replication_slot_advance(XLogRecPtr moveto)
@@ -341,67 +342,87 @@ pg_physical_replication_slot_advance(XLogRecPtr moveto)
 
 /*
  * Helper function for advancing logical replication slot forward.
+ *
  * The slot's restart_lsn is used as start point for reading records,
  * while confirmed_lsn is used as base point for the decoding context.
- * The LSN position to move to is checked by doing a per-record scan and
- * logical decoding which makes sure that confirmed_lsn is updated to a
- * LSN which allows the future slot consumer to get consistent logical
- * changes.
+ *
+ * We cannot just do LogicalConfirmReceivedLocation to update confirmed_flush,
+ * because we need to digest WAL to advance restart_lsn allowing to recycle
+ * WAL and removal of old catalog tuples.  As decoding is done in fast_forward
+ * mode, no changes are generated anyway.
  */
 static XLogRecPtr
 pg_logical_replication_slot_advance(XLogRecPtr moveto)
 {
        LogicalDecodingContext *ctx;
        ResourceOwner old_resowner = CurrentResourceOwner;
-       XLogRecPtr      startlsn = MyReplicationSlot->data.restart_lsn;
-       XLogRecPtr      retlsn = MyReplicationSlot->data.confirmed_flush;
+       XLogRecPtr      startlsn;
+       XLogRecPtr      retlsn;
 
        PG_TRY();
        {
-               /* restart at slot's confirmed_flush */
+               /*
+                * Create our decoding context in fast_forward mode, passing 
start_lsn
+                * as Invalid, so that we start processing from confirmed_flush.
+                */
                ctx = CreateDecodingContext(InvalidXLogRecPtr,
                                                                        NIL,
-                                                                       true,
+                                                                       true,   
/* fast_forward */
                                                                        
logical_read_local_xlog_page,
                                                                        NULL, 
NULL, NULL);
 
-               CurrentResourceOwner = ResourceOwnerCreate(CurrentResourceOwner,
-                                                                               
                   "logical decoding");
+               /*
+                * Start reading at the slot's restart_lsn, which we know to 
point to
+                * a valid record.
+                */
+               startlsn = MyReplicationSlot->data.restart_lsn;
+
+               /* Initialize our return value in case we don't do anything */
+               retlsn = MyReplicationSlot->data.confirmed_flush;
 
                /* invalidate non-timetravel entries */
                InvalidateSystemCaches();
 
-               /* Decode until we run out of records */
-               while ((startlsn != InvalidXLogRecPtr && startlsn < moveto) ||
-                          (ctx->reader->EndRecPtr != InvalidXLogRecPtr && 
ctx->reader->EndRecPtr < moveto))
+               /* Decode at least one record, until we run out of records */
+               while ((!XLogRecPtrIsInvalid(startlsn) &&
+                               startlsn < moveto) ||
+                          (!XLogRecPtrIsInvalid(ctx->reader->EndRecPtr) &&
+                               ctx->reader->EndRecPtr < moveto))
                {
-                       XLogRecord *record;
                        char       *errm = NULL;
+                       bool            gotrecord;
 
-                       record = XLogReadRecord(ctx->reader, startlsn, &errm);
+                       /*
+                        * Read records.  No changes are generated in 
fast_forward mode,
+                        * but snapbuilder/slot statuses are updated properly.
+                        */
+                       gotrecord = XLogReadRecord(ctx->reader, startlsn, 
&errm) != NULL;
                        if (errm)
                                elog(ERROR, "%s", errm);
 
-                       /*
-                        * Now that we've set up the xlog reader state, 
subsequent calls
-                        * pass InvalidXLogRecPtr to say "continue from last 
record"
-                        */
+                       /* Read sequentially from now on */
                        startlsn = InvalidXLogRecPtr;
 
                        /*
-                        * The {begin_txn,change,commit_txn}_wrapper callbacks 
above will
-                        * store the description into our tuplestore.
+                        * Process the record.  Storage-level changes are 
ignored in
+                        * fast_forward mode, but other modules (such as 
snapbuilder)
+                        * might still have critical updates to do.
                         */
-                       if (record != NULL)
+                       if (gotrecord)
                                LogicalDecodingProcessRecord(ctx, ctx->reader);
 
-                       /* Stop once the moving point wanted by caller has been 
reached */
+                       /* Stop once the requested target has been reached */
                        if (moveto <= ctx->reader->EndRecPtr)
                                break;
 
                        CHECK_FOR_INTERRUPTS();
                }
 
+               /*
+                * Logical decoding could have clobbered CurrentResourceOwner 
during
+                * transaction management, so restore the executor's value.  
(This is
+                * a kluge, but it's not worth cleaning up right now.)
+                */
                CurrentResourceOwner = old_resowner;
 
                if (ctx->reader->EndRecPtr != InvalidXLogRecPtr)
@@ -409,7 +430,7 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
                        LogicalConfirmReceivedLocation(moveto);
 
                        /*
-                        * If only the confirmed_flush_lsn has changed the slot 
won't get
+                        * If only the confirmed_flush LSN has changed the slot 
won't get
                         * marked as dirty by the above. Callers on the 
walsender
                         * interface are expected to keep track of their own 
progress and
                         * don't need it written out. But SQL-interface users 
cannot
-- 
2.11.0

Reply via email to