On Sat, Oct 14, 2023 at 10:45 AM Hayato Kuroda (Fujitsu)
<kuroda.hay...@fujitsu.com> wrote:
>
> Here is a new patch.
>
> Previously I wrote:
> > Based on above idea, I made new version patch which some functionalities 
> > were
> > exported from pg_resetwal. In this approach, pg_upgrade itself removed WALs 
> > and
> > then create logical slots, then pg_resetwal would be called with new option
> > --no-switch, which avoid to switch a WAL segment file. The option is only 
> > used
> > for the upgrading purpose so it is not written in doc and usage(). This 
> > option
> > is not required if pg_resetwal -o does not discard WAL records. Please see 
> > the
> > fork thread [1].
>
> But for now, these changes were reverted because changing pg_resetwal -o stuff
> may be a bit risky. This has been located more than ten years so that we 
> should
> be more careful for modifying.
> Also, I cannot come up with problems if slots are created after the 
> pg_resetwal.
> Background processes would not generate decodable changes (listed in [1]), and
> BGworkers by extensions could be ignored [2].
> Based on the discussion on forked thread [3] and if it is accepted, we will 
> apply
> again.
>

Yeah, I think introducing additional complexity unless it is really
required sounds a bit scary to me as well. BTW, please find attached
some cosmetic changes.

One minor additional comment:
+# Initialize subscriber cluster
+my $subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$subscriber->init(allows_streaming => 'logical');

Why do we need to set wal_level as logical for subscribers?

-- 
With Regards,
Amit Kapila.
diff --git a/src/backend/replication/logical/decode.c 
b/src/backend/replication/logical/decode.c
index 4144a43afd..cfa955a679 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -618,9 +618,9 @@ logicalmsg_decode(LogicalDecodingContext *ctx, 
XLogRecordBuffer *buf)
                return;
 
        /*
-        * We can also skip decoding when in 'fast_forward' mode. This check 
must
-        * be last because we don't want to set that processing_required flag
-        * unnecessarily.
+        * We also skip decoding in 'fast_forward' mode. This check must be last
+        * because we don't want to set the processing_required flag unless
+        * we have a decodable message.
         */
        if (ctx->fast_forward)
        {
@@ -1307,8 +1307,8 @@ DecodeTXNNeedSkip(LogicalDecodingContext *ctx, 
XLogRecordBuffer *buf,
                return true;
 
        /*
-        * We can also skip decoding when in 'fast_forward' mode. In passing set
-        * the 'processing_required' flag to indicate, were it not for this 
mode,
+        * We also skip decoding in 'fast_forward' mode. In passing set the
+        * 'processing_required' flag to indicate, were it not for this mode,
         * processing *would* have been required.
         */
        if (ctx->fast_forward)
diff --git a/src/backend/replication/logical/logical.c 
b/src/backend/replication/logical/logical.c
index 32869a75ab..e02cd0fa44 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1953,9 +1953,9 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
 }
 
 /*
- * Read to end of WAL starting from the decoding slot's restart_lsn. Return
- * true if any meaningful/decodable WAL records are encountered, otherwise
- * false.
+ * Read up to the end of WAL starting from the decoding slot's restart_lsn.
+ * Return true if any meaningful/decodable WAL records are encountered,
+ * otherwise false.
  *
  * Although this function is currently used only during pg_upgrade, there are
  * no reasons to restrict it, so IsBinaryUpgrade is not checked here.
diff --git a/src/backend/utils/adt/pg_upgrade_support.c 
b/src/backend/utils/adt/pg_upgrade_support.c
index 2a831bc397..a3a8ade405 100644
--- a/src/backend/utils/adt/pg_upgrade_support.c
+++ b/src/backend/utils/adt/pg_upgrade_support.c
@@ -274,8 +274,8 @@ binary_upgrade_set_missing_value(PG_FUNCTION_ARGS)
  * Returns true if there are no decodable WAL records after the
  * confirmed_flush_lsn. Otherwise false.
  *
- * This is a special purpose function to ensure the given slot can be upgraded
- * without data loss.
+ * This is a special purpose function to ensure that the given slot can be
+ * upgraded without data loss.
  */
 Datum
 binary_upgrade_slot_has_pending_wal(PG_FUNCTION_ARGS)
@@ -294,16 +294,10 @@ binary_upgrade_slot_has_pending_wal(PG_FUNCTION_ARGS)
 
        slot_name = PG_GETARG_NAME(0);
 
-       /*
-        * Acquire the given slot. There should be no error because the caller 
has
-        * already checked the slot exists.
-        */
+       /* Acquire the given slot. */
        ReplicationSlotAcquire(NameStr(*slot_name), true);
 
-       /*
-        * It's caller's responsibility to check the health of the slot.  
Upcoming
-        * functions assume the restart_lsn points to a valid record.
-        */
+       /* Slots must be valid as otherwise we won't be able to scan the WAL. */
        Assert(MyReplicationSlot->data.invalidated == RS_INVAL_NONE);
 
        end_of_wal = GetFlushRecPtr(NULL);
diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index 123f47a81f..8f3f5585a4 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -1541,10 +1541,8 @@ check_new_cluster_logical_replication_slots(void)
 /*
  * check_old_cluster_for_valid_slots()
  *
- * Verify that all the logical slots are usable and have consumed all the WAL
- * before shutdown. The check has already been done in
- * get_old_cluster_logical_slot_infos(), so this function reads the result and
- * reports to the user.
+ * Verify that all the logical slots are valid and have consumed all the WAL
+ * before shutdown.
  */
 static void
 check_old_cluster_for_valid_slots(bool live_check)
@@ -1607,7 +1605,7 @@ check_old_cluster_for_valid_slots(bool live_check)
                fclose(script);
 
                pg_log(PG_REPORT, "fatal");
-               pg_fatal("Your installation contains logical replication slots 
that cannot be upgraded.\n"
+               pg_fatal("Your installation contains invalid logical 
replication slots.\n"
                                 "These slots can't be copied, so this cluster 
cannot be upgraded.\n"
                                 "Consider removing invalid slots and/or 
consuming the pending WAL if any,\n"
                                 "and then restart the upgrade.\n"
diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c
index c56769fe54..5494e69227 100644
--- a/src/bin/pg_upgrade/info.c
+++ b/src/bin/pg_upgrade/info.c
@@ -651,8 +651,8 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool 
live_check)
        /*
         * Fetch the logical replication slot information. The check whether the
         * slot is considered caught up is done by an upgrade function. This
-        * regards the slot is caught up if any changes are not found while
-        * decoding. See binary_upgrade_slot_has_pending_wal().
+        * regards the slot as caught up if we don't find any decodable changes.
+        * See binary_upgrade_slot_has_pending_wal().
         *
         * Note that we can't ensure whether the slot is caught up during
         * live_check as the new WAL records could be generated.
diff --git a/src/bin/pg_upgrade/pg_upgrade.c b/src/bin/pg_upgrade/pg_upgrade.c
index 7acdf31d02..3960af4036 100644
--- a/src/bin/pg_upgrade/pg_upgrade.c
+++ b/src/bin/pg_upgrade/pg_upgrade.c
@@ -190,7 +190,12 @@ main(int argc, char **argv)
        check_ok();
 
        /*
-        * If the old cluster has logical slots, migrate them to a new cluster.
+        * Migrate the logical slots to the new cluster.  Note that we need to 
do
+        * this after resetting WAL because otherwise the required WAL would be
+        * removed and slots would become unusable.  There is a possibility that
+        * background processes might generate some WAL before we could create 
the
+        * slots in the new cluster but we can ignore that WAL as that won't be
+        * required downstream.
         */
        if (count_old_cluster_logical_slots())
        {
@@ -890,7 +895,6 @@ create_logical_replication_slots(void)
                LogicalSlotInfoArr *slot_arr = &old_db->slot_arr;
                PGconn     *conn;
                PQExpBuffer query;
-               char            log_file_name[MAXPGPATH];
 
                /* Skip this database if there are no slots */
                if (slot_arr->nslots == 0)
@@ -899,9 +903,6 @@ create_logical_replication_slots(void)
                conn = connectToServer(&new_cluster, old_db->db_name);
                query = createPQExpBuffer();
 
-               snprintf(log_file_name, sizeof(log_file_name),
-                                DB_DUMP_LOG_FILE_MASK, old_db->db_oid);
-
                pg_log(PG_STATUS, "%s", old_db->db_name);
 
                for (int slotnum = 0; slotnum < slot_arr->nslots; slotnum++)
diff --git a/src/include/replication/logical.h 
b/src/include/replication/logical.h
index 355247a58b..f8258d7c28 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -110,11 +110,7 @@ typedef struct LogicalDecodingContext
        /* Are we processing the end LSN of a transaction? */
        bool            end_xact;
 
-       /*
-        * Did the logical decoding context require processing WALs?
-        *
-        * This flag is used only when in 'fast_forward' mode.
-        */
+       /* Do we need to process any change in 'fast_forward' mode? */
        bool            processing_required;
 } LogicalDecodingContext;
 

Reply via email to