On Fri, Feb 13, 2015 at 4:57 PM, Michael Paquier wrote: > Moved patch to CF 2015-02 to not lose track of it, also because it does not > seem it received a proper review.
This patch does not apply anymore, so attached is a rebased version. The comments mentioned here have not been addressed: http://www.postgresql.org/message-id/54a7bf61.9080...@vmware.com Also, what kind of tests have been done? Logical decoding cannot be used while a node is in recovery. Regards, -- Michael
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 4a20569..3036ce6 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -216,7 +216,8 @@ static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, Transac static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc); static void XLogRead(char *buf, XLogRecPtr startptr, Size count); - +static XLogRecPtr GetLatestRequestPtr(void); +static TimeLineID ReadSendTimeLine(TimeLineID tli); /* Initialize walsender process before entering the main command loop */ void @@ -535,8 +536,6 @@ StartReplication(StartReplicationCmd *cmd) if (cmd->timeline != 0) { - XLogRecPtr switchpoint; - sendTimeLine = cmd->timeline; if (sendTimeLine == ThisTimeLineID) { @@ -545,18 +544,13 @@ StartReplication(StartReplicationCmd *cmd) } else { - List *timeLineHistory; - sendTimeLineIsHistoric = true; /* * Check that the timeline the client requested for exists, and * the requested start location is on that timeline. */ - timeLineHistory = readTimeLineHistory(ThisTimeLineID); - switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory, - &sendTimeLineNextTLI); - list_free_deep(timeLineHistory); + (void) ReadSendTimeLine(cmd->timeline); /* * Found the requested timeline in the history. Check that @@ -576,8 +570,8 @@ StartReplication(StartReplicationCmd *cmd) * that's older than the switchpoint, if it's still in the same * WAL segment. */ - if (!XLogRecPtrIsInvalid(switchpoint) && - switchpoint < cmd->startpoint) + if (!XLogRecPtrIsInvalid(sendTimeLineValidUpto) && + sendTimeLineValidUpto < cmd->startpoint) { ereport(ERROR, (errmsg("requested starting point %X/%X on timeline %u is not in this server's history", @@ -586,10 +580,9 @@ StartReplication(StartReplicationCmd *cmd) cmd->timeline), errdetail("This server's history forked from timeline %u at %X/%X.", cmd->timeline, - (uint32) (switchpoint >> 32), - (uint32) (switchpoint)))); + (uint32) (sendTimeLineValidUpto >> 32), + (uint32) (sendTimeLineValidUpto)))); } - sendTimeLineValidUpto = switchpoint; } } else @@ -928,6 +921,8 @@ static void StartLogicalReplication(StartReplicationCmd *cmd) { StringInfoData buf; + XLogRecPtr FlushPtr; + List *timeLineHistory; /* make sure that our requirements are still fulfilled */ CheckLogicalDecodingRequirements(); @@ -940,6 +935,8 @@ StartLogicalReplication(StartReplicationCmd *cmd) * Force a disconnect, so that the decoding code doesn't need to care * about an eventual switch from running in recovery, to running in a * normal environment. Client code is expected to handle reconnects. + * This covers the race condition where we are promoted half way + * through starting up. */ if (am_cascading_walsender && !RecoveryInProgress()) { @@ -948,6 +945,14 @@ StartLogicalReplication(StartReplicationCmd *cmd) walsender_ready_to_stop = true; } + if (am_cascading_walsender) + { + /* this also updates ThisTimeLineID */ + FlushPtr = GetStandbyFlushRecPtr(); + } + else + FlushPtr = GetFlushRecPtr(); + WalSndSetState(WALSNDSTATE_CATCHUP); /* Send a CopyBothResponse message, and start streaming */ @@ -974,6 +979,24 @@ StartLogicalReplication(StartReplicationCmd *cmd) logical_startptr = MyReplicationSlot->data.restart_lsn; /* + * Find the timeline for the start location, or throw an error. + * + * Logical replication relies upon replication slots. Each slot has a + * single timeline history baked into it, so this should be easy. + */ + timeLineHistory = readTimeLineHistory(ThisTimeLineID); + sendTimeLine = tliOfPointInHistory(logical_startptr, timeLineHistory); + if (sendTimeLine != ThisTimeLineID) + { + sendTimeLineIsHistoric = true; + sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, timeLineHistory, + &sendTimeLineNextTLI); + } + list_free_deep(timeLineHistory); + + streamingDoneSending = streamingDoneReceiving = false; + + /* * Report the location after which we'll send out further commits as the * current sentPtr. */ @@ -2179,93 +2202,10 @@ XLogSendPhysical(void) return; } - /* Figure out how far we can safely send the WAL. */ - if (sendTimeLineIsHistoric) - { - /* - * Streaming an old timeline that's in this server's history, but is - * not the one we're currently inserting or replaying. It can be - * streamed up to the point where we switched off that timeline. - */ - SendRqstPtr = sendTimeLineValidUpto; - } - else if (am_cascading_walsender) - { - /* - * Streaming the latest timeline on a standby. - * - * Attempt to send all WAL that has already been replayed, so that we - * know it's valid. If we're receiving WAL through streaming - * replication, it's also OK to send any WAL that has been received - * but not replayed. - * - * The timeline we're recovering from can change, or we can be - * promoted. In either case, the current timeline becomes historic. We - * need to detect that so that we don't try to stream past the point - * where we switched to another timeline. We check for promotion or - * timeline switch after calculating FlushPtr, to avoid a race - * condition: if the timeline becomes historic just after we checked - * that it was still current, it's still be OK to stream it up to the - * FlushPtr that was calculated before it became historic. - */ - bool becameHistoric = false; - - SendRqstPtr = GetStandbyFlushRecPtr(); - - if (!RecoveryInProgress()) - { - /* - * We have been promoted. RecoveryInProgress() updated - * ThisTimeLineID to the new current timeline. - */ - am_cascading_walsender = false; - becameHistoric = true; - } - else - { - /* - * Still a cascading standby. But is the timeline we're sending - * still the one recovery is recovering from? ThisTimeLineID was - * updated by the GetStandbyFlushRecPtr() call above. - */ - if (sendTimeLine != ThisTimeLineID) - becameHistoric = true; - } - - if (becameHistoric) - { - /* - * The timeline we were sending has become historic. Read the - * timeline history file of the new timeline to see where exactly - * we forked off from the timeline we were sending. - */ - List *history; - - history = readTimeLineHistory(ThisTimeLineID); - sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history, &sendTimeLineNextTLI); - - Assert(sendTimeLine < sendTimeLineNextTLI); - list_free_deep(history); - - sendTimeLineIsHistoric = true; - - SendRqstPtr = sendTimeLineValidUpto; - } - } - else - { - /* - * Streaming the current timeline on a master. - * - * Attempt to send all data that's already been written out and - * fsync'd to disk. We cannot go further than what's been written out - * given the current implementation of XLogRead(). And in any case - * it's unsafe to send WAL that is not securely down to disk on the - * master: if the master subsequently crashes and restarts, slaves - * must not have applied any WAL that gets lost on the master. - */ - SendRqstPtr = GetFlushRecPtr(); - } + /* + * Get the SendRqstPtr and follow any timeline changes. + */ + SendRqstPtr = GetLatestRequestPtr(); /* * If this is a historic timeline and we've reached the point where we @@ -2402,6 +2342,7 @@ XLogSendPhysical(void) static void XLogSendLogical(void) { + XLogRecPtr SendRqstPtr; XLogRecord *record; char *errm; @@ -2436,6 +2377,42 @@ XLogSendLogical(void) WalSndCaughtUp = true; } + /* + * We don't need the SendRqstPtr, but we want to follow timeline + * changes and set sendTimeLineIsHistoric if required. + */ + if (!sendTimeLineIsHistoric) + SendRqstPtr = GetLatestRequestPtr(); + + /* + * If this is a historic timeline and we've reached the point where we + * forked to the next timeline, switch to new timeline. + */ + if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr) + { + /* close the current file. */ + if (sendFile >= 0) + close(sendFile); + sendFile = -1; + + elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)", + (uint32) (sendTimeLineValidUpto >> 32), (uint32) sendTimeLineValidUpto, + (uint32) (sentPtr >> 32), (uint32) sentPtr); + + /* + * Did we reach the current timeline yet? If not, switch to the + * next one and follow that to its endpoint. + */ + if (sendTimeLineNextTLI == ThisTimeLineID) + { + sendTimeLineIsHistoric = false; + sendTimeLine = sendTimeLineNextTLI; + sendTimeLineValidUpto = InvalidXLogRecPtr; + } + else + sendTimeLine = ReadSendTimeLine(sendTimeLineNextTLI); + } + /* Update shared memory status */ { /* use volatile pointer to prevent code rearrangement */ @@ -2445,6 +2422,8 @@ XLogSendLogical(void) walsnd->sentPtr = sentPtr; SpinLockRelease(&walsnd->mutex); } + + /* ps display updated by plugin, if desired */ } /* @@ -2947,3 +2926,106 @@ GetOldestWALSendPointer(void) } #endif + +static XLogRecPtr +GetLatestRequestPtr(void) +{ + XLogRecPtr SendRqstPtr; + + /* Figure out how far we can safely send the WAL. */ + if (sendTimeLineIsHistoric) + { + /* + * Streaming an old timeline timeline that's in this server's history, + * but is not the one we're currently inserting or replaying. It can + * be streamed up to the point where we switched off that timeline. + */ + SendRqstPtr = sendTimeLineValidUpto; + } + else if (am_cascading_walsender) + { + /* + * Streaming the latest timeline on a standby. + * + * Attempt to send all WAL that has already been replayed, so that we + * know it's valid. If we're receiving WAL through streaming + * replication, it's also OK to send any WAL that has been received + * but not replayed. + * + * The timeline we're recovering from can change, or we can be + * promoted. In either case, the current timeline becomes historic. We + * need to detect that so that we don't try to stream past the point + * where we switched to another timeline. We check for promotion or + * timeline switch after calculating FlushPtr, to avoid a race + * condition: if the timeline becomes historic just after we checked + * that it was still current, it's still be OK to stream it up to the + * FlushPtr that was calculated before it became historic. + */ + bool becameHistoric = false; + + SendRqstPtr = GetStandbyFlushRecPtr(); + + if (!RecoveryInProgress()) + { + /* + * We have been promoted. RecoveryInProgress() updated + * ThisTimeLineID to the new current timeline. + */ + am_cascading_walsender = false; + becameHistoric = true; + } + else + { + /* + * Still a cascading standby. But is the timeline we're sending + * still the one recovery is recovering from? ThisTimeLineID was + * updated by the GetStandbyFlushRecPtr() call above. + */ + if (sendTimeLine != ThisTimeLineID) + becameHistoric = true; + } + + if (becameHistoric) + { + /* + * The timeline we were sending has become historic. Read the + * timeline history file of the new timeline to see where exactly + * we forked off from the timeline we were sending. + */ + (void) ReadSendTimeLine(ThisTimeLineID); + + sendTimeLineIsHistoric = true; + + SendRqstPtr = sendTimeLineValidUpto; + } + } + else + { + /* + * Streaming the current timeline on a master. + * + * Attempt to send all data that's already been written out and + * fsync'd to disk. We cannot go further than what's been written out + * given the current implementation of XLogRead(). And in any case + * it's unsafe to send WAL that is not securely down to disk on the + * master: if the master subsequently crashes and restarts, slaves + * must not have applied any WAL that gets lost on the master. + */ + SendRqstPtr = GetFlushRecPtr(); + } + + return SendRqstPtr; +} + +static TimeLineID +ReadSendTimeLine(TimeLineID tli) +{ + List *history; + + history = readTimeLineHistory(ThisTimeLineID); + sendTimeLineValidUpto = tliSwitchPoint(tli, history, + &sendTimeLineNextTLI); + + Assert(tli < sendTimeLineNextTLI); + list_free_deep(history); +}
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers