Hi, Thomas, there's one point at the bottom wrt ConditionVariables that'd be interesting for you to comment on.
On 2023-01-05 16:15:39 -0500, Robert Haas wrote: > On Tue, Jan 3, 2023 at 2:42 AM Drouvot, Bertrand > <bertranddrouvot...@gmail.com> wrote: > > Please find attached v36, tiny rebase due to 1de58df4fe. > > 0001 looks committable to me now, though we probably shouldn't do that > unless we're pretty confident about shipping enough of the rest of > this to accomplish something useful. Cool! ISTM that the ordering of patches isn't quite right later on. ISTM that it doesn't make sense to introduce working logic decoding without first fixing WalSndWaitForWal() (i.e. patch 0006). What made you order the patches that way? 0001: > 4. We can't rely on the standby's relcache entries for this purpose in > any way, because the WAL record that causes the problem might be > replayed before the standby even reaches consistency. The startup process can't access catalog contents in the first place, so the consistency issue is secondary. ISTM that the commit message omits a fairly significant portion of the change: The introduction of indisusercatalog / the reason for its introduction. Why is indisusercatalog stored as "full" column, whereas we store the fact of table being used as a catalog table in a reloption? I'm not adverse to moving to a full column, but then I think we should do the same for tables. Earlier version of the patches IIRC sourced the "catalogness" from the relation. What lead you to changing that? I'm not saying it's wrong, just not sure it's right either. It'd be good to introduce cross-checks that indisusercatalog is set correctly. RelationGetIndexList() seems like a good candidate. I'd probably split the introduction of indisusercatalog into a separate patch. Why was HEAP_DEFAULT_USER_CATALOG_TABLE introduced in this patch? I wonder if we instead should compute a relation's "catalogness" in the relcache. That'd would have the advantage of making RelationIsUsedAsCatalogTable() cheaper and working for all kinds of relations. VISIBILITYMAP_ON_CATALOG_ACCESSIBLE_IN_LOGICAL_DECODING is a very long identifier. Given that the field in the xlog records is just named isCatalogRel, any reason to not just name it correspondingly? 0002: > +/* > + * Helper for InvalidateConflictingLogicalReplicationSlot -- acquires the > given slot > + * and mark it invalid, if necessary and possible. > + * > + * Returns whether ReplicationSlotControlLock was released in the interim > (and > + * in that case we're not holding the lock at return, otherwise we are). > + * > + * This is inherently racy, because we release the LWLock > + * for syscalls, so caller must restart if we return true. > + */ > +static bool > +InvalidatePossiblyConflictingLogicalReplicationSlot(ReplicationSlot *s, > TransactionId xid) This appears to be a near complete copy of InvalidatePossiblyObsoleteSlot(). I don't think we should have two versions of that non-trivial code. Seems we could just have an additional argument for InvalidatePossiblyObsoleteSlot()? > + ereport(LOG, > + (errmsg("invalidating slot \"%s\" > because it conflicts with recovery", NameStr(slotname)))); > + I think this should report more details, similar to what InvalidateObsoleteReplicationSlots() does. > --- a/src/backend/replication/logical/logicalfuncs.c > +++ b/src/backend/replication/logical/logicalfuncs.c > @@ -216,11 +216,14 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo > fcinfo, bool confirm, bool bin > > /* > * After the sanity checks in CreateDecodingContext, make sure > the > - * restart_lsn is valid. Avoid "cannot get changes" wording in > this > + * restart_lsn is valid or both xmin and catalog_xmin are valid. > + * Avoid "cannot get changes" wording in this > * errmsg because that'd be confusingly ambiguous about no > changes > * being available. > */ > - if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn)) > + if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn) > + || (!TransactionIdIsValid(MyReplicationSlot->data.xmin) > + && > !TransactionIdIsValid(MyReplicationSlot->data.catalog_xmin))) > ereport(ERROR, > > (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), > errmsg("can no longer get changes from > replication slot \"%s\"", Hm. Feels like we should introduce a helper like SlotIsInvalidated() instead of having this condition in a bunch of places. > + if (!TransactionIdIsValid(MyReplicationSlot->data.xmin) > + && !TransactionIdIsValid(MyReplicationSlot->data.catalog_xmin)) > + ereport(ERROR, > + > (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), > + errmsg("cannot read from logical replication > slot \"%s\"", > + cmd->slotname), > + errdetail("This slot has been invalidated > because it was conflicting with recovery."))); > + This is a more precise error than the one in pg_logical_slot_get_changes_guts(). I think both places should output the same error. ISTM that the relevant code should be in CreateDecodingContext(). Imo the code to deal with the WAL version of this has been misplaced... > --- a/src/backend/storage/ipc/procarray.c > +++ b/src/backend/storage/ipc/procarray.c > @@ -3477,6 +3477,10 @@ SignalVirtualTransaction(VirtualTransactionId vxid, > ProcSignalReason sigmode, > > GET_VXID_FROM_PGPROC(procvxid, *proc); > > + /* > + * Note: vxid.localTransactionId can be invalid, which means the > + * request is to signal the pid that is not running a > transaction. > + */ > if (procvxid.backendId == vxid.backendId && > procvxid.localTransactionId == vxid.localTransactionId) > { I can't really parse the comment. > @@ -500,6 +502,9 @@ ResolveRecoveryConflictWithSnapshot(TransactionId > snapshotConflictHorizon, > > PROCSIG_RECOVERY_CONFLICT_SNAPSHOT, > > WAIT_EVENT_RECOVERY_CONFLICT_SNAPSHOT, > > true); > + > + if (isCatalogRel) > + InvalidateConflictingLogicalReplicationSlots(locator.dbOid, > snapshotConflictHorizon); > } Might be worth checking if wal_level >= logical before the somewhat expensive InvalidateConflictingLogicalReplicationSlots(). > @@ -3051,6 +3054,25 @@ RecoveryConflictInterrupt(ProcSignalReason reason) > case PROCSIG_RECOVERY_CONFLICT_LOCK: > case PROCSIG_RECOVERY_CONFLICT_TABLESPACE: > case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT: > + case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT: > + /* > + * For conflicts that require a logical slot to > be invalidated, the > + * requirement is for the signal receiver to > release the slot, > + * so that it could be invalidated by the > signal sender. So for > + * normal backends, the transaction should be > aborted, just > + * like for other recovery conflicts. But if > it's walsender on > + * standby, then it has to be killed so as to > release an > + * acquired logical slot. > + */ > + if (am_cascading_walsender && > + reason == > PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT && > + MyReplicationSlot && > SlotIsLogical(MyReplicationSlot)) > + { > + RecoveryConflictPending = true; > + QueryCancelPending = true; > + InterruptPending = true; > + break; > + } > > /* > * If we aren't in a transaction any longer > then ignore. Why does the walsender need to be killed? I think it might just be that IsTransactionOrTransactionBlock() might return false, even though we want to cancel. The code actually seems to only cancel (QueryCancelPending is set rather than ProcDiePending), but the comment talks about killing? 0003: > Allow a logical slot to be created on standby. Restrict its usage > or its creation if wal_level on primary is less than logical. > During slot creation, it's restart_lsn is set to the last replayed > LSN. Effectively, a logical slot creation on standby waits for an > xl_running_xact record to arrive from primary. Conflicting slots > would be handled in next commits. I think the commit message might be outdated, the next commit is a test. > + /* > + * Replay pointer may point one past the end of the > record. If that > + * is a XLOG page boundary, it will not be a valid LSN > for the > + * start of a record, so bump it up past the page > header. > + */ > + if (!XRecOffIsValid(restart_lsn)) > + { > + if (restart_lsn % XLOG_BLCKSZ != 0) > + elog(ERROR, "invalid replay pointer"); > + > + /* For the first page of a segment file, it's a > long header */ > + if (XLogSegmentOffset(restart_lsn, > wal_segment_size) == 0) > + restart_lsn += SizeOfXLogLongPHD; > + else > + restart_lsn += SizeOfXLogShortPHD; > + } Is this actually needed? Supposedly xlogreader can work just fixe with an address at the start of a page? /* * Caller supplied a position to start at. * * In this case, NextRecPtr should already be pointing either to a * valid record starting position or alternatively to the beginning of * a page. See the header comments for XLogBeginRead. */ Assert(RecPtr % XLOG_BLCKSZ == 0 || XRecOffIsValid(RecPtr)); > /* > - * Since logical decoding is only permitted on a primary server, we know > - * that the current timeline ID can't be changing any more. If we did > this > - * on a standby, we'd have to worry about the values we compute here > - * becoming invalid due to a promotion or timeline change. > + * Since logical decoding is also permitted on a standby server, we need > + * to check if the server is in recovery to decide how to get the > current > + * timeline ID (so that it also cover the promotion or timeline change > cases). > */ > + if (!RecoveryInProgress()) > + currTLI = GetWALInsertionTimeLine(); > + else > + GetXLogReplayRecPtr(&currTLI); > + This seems to remove some content from the !recovery case. It's a bit odd that here RecoveryInProgress() is used, whereas further down am_cascading_walsender is used. > @@ -3074,10 +3078,12 @@ XLogSendLogical(void) > * If first time through in this session, initialize flushPtr. > Otherwise, > * we only need to update flushPtr if EndRecPtr is past it. > */ > - if (flushPtr == InvalidXLogRecPtr) > - flushPtr = GetFlushRecPtr(NULL); > - else if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr) > - flushPtr = GetFlushRecPtr(NULL); > + if (flushPtr == InvalidXLogRecPtr || > + logical_decoding_ctx->reader->EndRecPtr >= flushPtr) > + { > + flushPtr = (am_cascading_walsender ? > + GetStandbyFlushRecPtr(NULL) : > GetFlushRecPtr(NULL)); > + } > > /* If EndRecPtr is still past our flushPtr, it means we caught up. */ > if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr) A short if inside a normal if seems ugly to me. 0004: > @@ -3037,6 +3037,43 @@ $SIG{TERM} = $SIG{INT} = sub { > > =pod > > +=item $node->create_logical_slot_on_standby(self, master, slot_name, dbname) > + > +Create logical replication slot on given standby > + > +=cut > + > +sub create_logical_slot_on_standby > +{ Any reason this has to be standby specific? > + # Now arrange for the xl_running_xacts record for which pg_recvlogical > + # is waiting. > + $master->safe_psql('postgres', 'CHECKPOINT'); > + Hm, that's quite expensive. Perhaps worth adding a C helper that can do that for us instead? This will likely also be needed in real applications after all. > + print "starting pg_recvlogical\n"; I don't think tests should just print somewhere. Either diag() or note() should be used. > + if ($wait) > + # make sure activeslot is in use > + { > + $node_standby->poll_query_until('testdb', > + "SELECT EXISTS (SELECT 1 FROM pg_replication_slots > WHERE slot_name = 'activeslot' AND active_pid IS NOT NULL)" > + ) or die "slot never became active"; > + } That comment placement imo is quite odd. > +# test if basic decoding works > +is(scalar(my @foobar = split /^/m, $result), > + 14, 'Decoding produced 14 rows'); Maybe mention that it's 2 transactions + 10 rows? > +$node_primary->wait_for_catchup($node_standby, 'replay', > $node_primary->lsn('flush')); There's enough copies of this that I wonder if we shouldn't introduce a Cluster.pm level helper for this. > +print "waiting to replay $endpos\n"; See above. > +my $stdout_recv = $node_standby->pg_recvlogical_upto( > + 'testdb', 'activeslot', $endpos, 180, > + 'include-xids' => '0', > + 'skip-empty-xacts' => '1'); I don't think this should use a hardcoded 180 but $PostgreSQL::Test::Utils::timeout_default. > +# One way to reproduce recovery conflict is to run VACUUM FULL with > +# hot_standby_feedback turned off on the standby. > +$node_standby->append_conf('postgresql.conf',q[ > +hot_standby_feedback = off > +]); > +$node_standby->restart; IIRC a reload should suffice. > +# This should trigger the conflict > +$node_primary->safe_psql('testdb', 'VACUUM FULL'); Can we do something cheaper than rewriting the entire database? Seems rewriting a single table ought to be sufficient? I think it'd also be good to test that rewriting a non-catalog table doesn't trigger an issue. > +################################################## > +# Recovery conflict: Invalidate conflicting slots, including in-use slots > +# Scenario 2: conflict due to row removal with hot_standby_feedback off. > +################################################## > + > +# get the position to search from in the standby logfile > +my $logstart = -s $node_standby->logfile; > + > +# drop the logical slots > +$node_standby->psql('postgres', q[SELECT > pg_drop_replication_slot('inactiveslot')]); > +$node_standby->psql('postgres', q[SELECT > pg_drop_replication_slot('activeslot')]); > + > +create_logical_slots(); > + > +# One way to produce recovery conflict is to create/drop a relation and > launch a vacuum > +# with hot_standby_feedback turned off on the standby. > +$node_standby->append_conf('postgresql.conf',q[ > +hot_standby_feedback = off > +]); > +$node_standby->restart; > +# ensure walreceiver feedback off by waiting for expected xmin and > +# catalog_xmin on primary. Both should be NULL since hs_feedback is off > +wait_for_xmins($node_primary, $primary_slotname, > + "xmin IS NULL AND catalog_xmin IS NULL"); > + > +$handle = make_slot_active(1); This is a fair bit of repeated setup, maybe put it into a function? I think it'd be good to test the ongoing decoding via the SQL interface also gets correctly handled. But it might be too hard to do reliably. > +################################################## > +# Test standby promotion and logical decoding behavior > +# after the standby gets promoted. > +################################################## > + I think this also should test the streaming / walsender case. 0006: > diff --git a/src/backend/access/transam/xlogrecovery.c > b/src/backend/access/transam/xlogrecovery.c > index bc3c3eb3e7..98c96eb864 100644 > --- a/src/backend/access/transam/xlogrecovery.c > +++ b/src/backend/access/transam/xlogrecovery.c > @@ -358,6 +358,9 @@ typedef struct XLogRecoveryCtlData > RecoveryPauseState recoveryPauseState; > ConditionVariable recoveryNotPausedCV; > > + /* Replay state (see getReplayedCV() for more explanation) */ > + ConditionVariable replayedCV; > + > slock_t info_lck; /* locks shared variables shown > above */ > } XLogRecoveryCtlData; > getReplayedCV() doesn't seem to fit into any of the naming scheems in use for xlogrecovery.h. > - * Sleep until something happens or we time out. Also wait for > the > - * socket becoming writable, if there's still pending output. > + * When not in recovery, sleep until something happens or we > time out. > + * Also wait for the socket becoming writable, if there's still > pending output. Hm. Is there a problem with not handling the becoming-writable case in the in-recovery case? > + else > + /* > + * We are in the logical decoding on standby case. > + * We are waiting for the startup process to replay wal > record(s) using > + * a timeout in case we are requested to stop. > + */ > + { I don't think pgindent will like that formatting.... > + ConditionVariablePrepareToSleep(replayedCV); > + ConditionVariableTimedSleep(replayedCV, 1000, > + > WAIT_EVENT_WAL_SENDER_WAIT_REPLAY); > + } I think this is racy, see ConditionVariablePrepareToSleep()'s comment: * Caution: "before entering the loop" means you *must* test the exit * condition between calling ConditionVariablePrepareToSleep and calling * ConditionVariableSleep. If that is inconvenient, omit calling * ConditionVariablePrepareToSleep. Basically, the ConditionVariablePrepareToSleep() should be before the loop body. I don't think the fixed timeout here makes sense. For one, we need to wake up based on WalSndComputeSleeptime(), otherwise we're ignoring wal_sender_timeout (which can be quite small). It's also just way too frequent - we're trying to avoid constantly waking up unnecessarily. Perhaps we could deal with the pq_is_send_pending() issue by having a version of ConditionVariableTimedSleep() that accepts a WaitEventSet? Greetings, Andres Freund