On Mon, Jul 11, 2011 at 10:26 AM, Fujii Masao <masao.fu...@gmail.com> wrote: > On Mon, Jul 11, 2011 at 3:30 AM, Josh Berkus <j...@agliodbs.com> wrote: >> Do you think you'll submit a new version of the patch this commitfest? > > Yes. I'm now updating the patch according to Simon's comments. > I will submit it today.
Attached is the updated version which addresses all the issues raised by Simon. > The risk you describe already exists in current code. > > I regard it as a non-risk. The unlink() and the rename() are executed > consecutively, so the gap between them is small, so the chance of a > SIGKILL in that gap at the same time as losing the archive seems low, > and we can always get that file from the master again if we are > streaming. Any code you add to "fix" this will get executed so rarely > it probably won't work when we need it to. > > In the current scheme we restart archiving from the last restartpoint, > which exists only on the archive. This new patch improves upon this by > keeping the most recent files locally, so we are less expose in the > case of archive unavailability. So this patch already improves things > and we don't need any more than that. No extra code please, IMHO. Yes, I added no extra code for the risk I raised upthread. > In #2, there is another problem; walsender might have the pre-existing file > open, so the startup process would need to request walsenders to close the > file before removing (or renaming) it, wait for new file to appear and open it > again. I implemented this. Regards, -- Fujii Masao NIPPON TELEGRAPH AND TELEPHONE CORPORATION NTT Open Source Software Center
*** a/doc/src/sgml/config.sgml --- b/doc/src/sgml/config.sgml *************** *** 1949,1954 **** SET ENABLE_SEQSCAN TO OFF; --- 1949,1956 ---- The values of these parameters on standby servers are irrelevant, although you may wish to set them there in preparation for the possibility of a standby becoming the master. + Some of them need to be set in the standby for cascade replication + (see <xref linkend="cascade-replication">). </para> <variablelist> *************** *** 2019,2025 **** SET ENABLE_SEQSCAN TO OFF; doesn't keep any extra segments for standby purposes, so the number of old WAL segments available to standby servers is a function of the location of the previous checkpoint and status of WAL ! archiving. This parameter has no effect on restartpoints. This parameter can only be set in the <filename>postgresql.conf</> file or on the server command line. </para> --- 2021,2027 ---- doesn't keep any extra segments for standby purposes, so the number of old WAL segments available to standby servers is a function of the location of the previous checkpoint and status of WAL ! archiving. This parameter can only be set in the <filename>postgresql.conf</> file or on the server command line. </para> *************** *** 2121,2127 **** SET ENABLE_SEQSCAN TO OFF; synchronous replication is enabled, individual transactions can be configured not to wait for replication by setting the <xref linkend="guc-synchronous-commit"> parameter to ! <literal>local</> or <literal>off</>. </para> <para> This parameter can only be set in the <filename>postgresql.conf</> --- 2123,2130 ---- synchronous replication is enabled, individual transactions can be configured not to wait for replication by setting the <xref linkend="guc-synchronous-commit"> parameter to ! <literal>local</> or <literal>off</>. This parameter has no effect on ! cascade replication. </para> <para> This parameter can only be set in the <filename>postgresql.conf</> *** a/doc/src/sgml/high-availability.sgml --- b/doc/src/sgml/high-availability.sgml *************** *** 877,884 **** primary_conninfo = 'host=192.168.1.50 port=5432 user=foo password=foopass' --- 877,921 ---- network delay, or that the standby is under heavy load. </para> </sect3> + </sect2> + + <sect2 id="cascade-replication"> + <title>Cascade Replication</title> + <indexterm zone="high-availability"> + <primary>Cascade Replication</primary> + </indexterm> + <para> + Cascade replication feature allows the standby to accept the replication + connections and stream WAL records to another standbys. This is useful + for reducing the number of standbys connecting to the master and reducing + the overhead of the master, when you have many standbys. + </para> + <para> + The cascading standby sends not only WAL records received from the + master but also those restored from the archive. So even if the replication + connection in higher level is terminated, you can continue cascade replication. + </para> + <para> + Cascade replication is asynchronous. Note that synchronous replication + (see <xref linkend="synchronous-replication">) has no effect on cascade + replication. + </para> + <para> + Promoting the cascading standby terminates all the cascade replication + connections which it uses. This is because the timeline becomes different + between standbys, and they cannot continue replication any more. + </para> + <para> + To use cascade replication, set up the cascading standby so that it can + accept the replication connections, i.e., set <varname>max_wal_senders</>, + <varname>hot_standby</> and authentication option (see + <xref linkend="streaming-replication"> and <xref linkend="hot-standby">). + Also set <varname>primary_conninfo</> in the cascaded standby to point + to the cascading standby. + </para> </sect2> + <sect2 id="synchronous-replication"> <title>Synchronous Replication</title> *** a/src/backend/access/transam/xlog.c --- b/src/backend/access/transam/xlog.c *************** *** 446,451 **** typedef struct XLogCtlData --- 446,453 ---- XLogRecPtr recoveryLastRecPtr; /* timestamp of last COMMIT/ABORT record replayed (or being replayed) */ TimestampTz recoveryLastXTime; + /* end of the last record restored from the archive */ + XLogRecPtr restoreLastRecPtr; /* Are we requested to pause recovery? */ bool recoveryPause; *************** *** 612,617 **** static void CheckRequiredParameterValues(void); --- 614,620 ---- static void XLogReportParameters(void); static void LocalSetXLogInsertAllowed(void); static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags); + static void WalKeepSegments(uint32 *logId, uint32 *logSeg, XLogRecPtr recptr); static bool XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites, XLogRecPtr *lsn, BkpBlock *bkpb); *************** *** 2729,2734 **** XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli, --- 2732,2792 ---- elog(ERROR, "invalid XLogFileRead source %d", source); } + /* + * If the segment was fetched from archival storage, replace + * the existing xlog segment (if any) with the archival version. + */ + if (source == XLOG_FROM_ARCHIVE) + { + /* use volatile pointer to prevent code rearrangement */ + volatile XLogCtlData *xlogctl = XLogCtl; + XLogRecPtr endptr; + char xlogfpath[MAXPGPATH]; + bool reload = false; + struct stat statbuf; + + XLogFilePath(xlogfpath, tli, log, seg); + if (stat(xlogfpath, &statbuf) == 0) + { + if (unlink(xlogfpath) != 0) + ereport(FATAL, + (errcode_for_file_access(), + errmsg("could not remove file \"%s\": %m", + xlogfpath))); + reload = true; + } + + if (rename(path, xlogfpath) < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not rename file \"%s\" to \"%s\": %m", + path, xlogfpath))); + + /* + * If the existing segment was replaced, since walsenders might have + * it open, request them to reload a currently-open segment. + */ + if (reload) + WalSndRqstFileReload(); + + /* + * Calculate the end location of the restored WAL file and save it in + * shmem. It's used as current standby flush position, and cascading + * walsenders try to send WAL records up to this location. + */ + endptr.xlogid = log; + endptr.xrecoff = seg * XLogSegSize; + XLByteAdvance(endptr, XLogSegSize); + + SpinLockAcquire(&xlogctl->info_lck); + xlogctl->restoreLastRecPtr = endptr; + SpinLockRelease(&xlogctl->info_lck); + + /* Signal walsender that new WAL has arrived */ + if (AllowCascadeReplication()) + WalSndWakeup(); + } + fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0); if (fd >= 0) { *************** *** 3361,3378 **** RemoveOldXlogFiles(uint32 log, uint32 seg, XLogRecPtr endptr) strspn(xlde->d_name, "0123456789ABCDEF") == 24 && strcmp(xlde->d_name + 8, lastoff + 8) <= 0) { ! /* ! * Normally we don't delete old XLOG files during recovery to ! * avoid accidentally deleting a file that looks stale due to a ! * bug or hardware issue, but in fact contains important data. ! * During streaming recovery, however, we will eventually fill the ! * disk if we never clean up, so we have to. That's not an issue ! * with file-based archive recovery because in that case we ! * restore one XLOG file at a time, on-demand, and with a ! * different filename that can't be confused with regular XLOG ! * files. ! */ ! if (WalRcvInProgress() || XLogArchiveCheckDone(xlde->d_name)) { snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlde->d_name); --- 3419,3425 ---- strspn(xlde->d_name, "0123456789ABCDEF") == 24 && strcmp(xlde->d_name + 8, lastoff + 8) <= 0) { ! if (RecoveryInProgress() || XLogArchiveCheckDone(xlde->d_name)) { snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlde->d_name); *************** *** 5484,5545 **** exitArchiveRecovery(TimeLineID endTLI, uint32 endLogId, uint32 endLogSeg) } /* ! * If the segment was fetched from archival storage, we want to replace ! * the existing xlog segment (if any) with the archival version. This is ! * because whatever is in XLOGDIR is very possibly older than what we have ! * from the archives, since it could have come from restoring a PGDATA ! * backup. In any case, the archival version certainly is more ! * descriptive of what our current database state is, because that is what ! * we replayed from. * ! * Note that if we are establishing a new timeline, ThisTimeLineID is ! * already set to the new value, and so we will create a new file instead ! * of overwriting any existing file. (This is, in fact, always the case ! * at present.) */ ! snprintf(recoveryPath, MAXPGPATH, XLOGDIR "/RECOVERYXLOG"); ! XLogFilePath(xlogpath, ThisTimeLineID, endLogId, endLogSeg); ! ! if (restoredFromArchive) { ! ereport(DEBUG3, ! (errmsg_internal("moving last restored xlog to \"%s\"", ! xlogpath))); ! unlink(xlogpath); /* might or might not exist */ ! if (rename(recoveryPath, xlogpath) != 0) ! ereport(FATAL, ! (errcode_for_file_access(), ! errmsg("could not rename file \"%s\" to \"%s\": %m", ! recoveryPath, xlogpath))); ! /* XXX might we need to fix permissions on the file? */ ! } ! else ! { ! /* ! * If the latest segment is not archival, but there's still a ! * RECOVERYXLOG laying about, get rid of it. ! */ ! unlink(recoveryPath); /* ignore any error */ ! /* ! * If we are establishing a new timeline, we have to copy data from ! * the last WAL segment of the old timeline to create a starting WAL ! * segment for the new timeline. ! * ! * Notify the archiver that the last WAL segment of the old timeline ! * is ready to copy to archival storage. Otherwise, it is not archived ! * for a while. ! */ ! if (endTLI != ThisTimeLineID) { ! XLogFileCopy(endLogId, endLogSeg, ! endTLI, endLogId, endLogSeg); ! ! if (XLogArchivingActive()) ! { ! XLogFileName(xlogpath, endTLI, endLogId, endLogSeg); ! XLogArchiveNotify(xlogpath); ! } } } --- 5531,5553 ---- } /* ! * If we are establishing a new timeline, we have to copy data from ! * the last WAL segment of the old timeline to create a starting WAL ! * segment for the new timeline. * ! * Notify the archiver that the last WAL segment of the old timeline ! * is ready to copy to archival storage. Otherwise, it is not archived ! * for a while. */ ! if (endTLI != ThisTimeLineID) { ! XLogFileCopy(endLogId, endLogSeg, ! endTLI, endLogId, endLogSeg); ! if (XLogArchivingActive()) { ! XLogFileName(xlogpath, endTLI, endLogId, endLogSeg); ! XLogArchiveNotify(xlogpath); } } *************** *** 5550,5555 **** exitArchiveRecovery(TimeLineID endTLI, uint32 endLogId, uint32 endLogSeg) --- 5558,5570 ---- XLogFileName(xlogpath, ThisTimeLineID, endLogId, endLogSeg); XLogArchiveCleanup(xlogpath); + /* + * Since there might be a partial WAL segment named RECOVERYXLOG, + * get rid of it. + */ + snprintf(recoveryPath, MAXPGPATH, XLOGDIR "/RECOVERYXLOG"); + unlink(recoveryPath); /* ignore any error */ + /* Get rid of any remaining recovered timeline-history file, too */ snprintf(recoveryPath, MAXPGPATH, XLOGDIR "/RECOVERYHISTORY"); unlink(recoveryPath); /* ignore any error */ *************** *** 7871,7916 **** CreateCheckPoint(int flags) */ if (_logId || _logSeg) { ! /* ! * Calculate the last segment that we need to retain because of ! * wal_keep_segments, by subtracting wal_keep_segments from the new ! * checkpoint location. ! */ ! if (wal_keep_segments > 0) ! { ! uint32 log; ! uint32 seg; ! int d_log; ! int d_seg; ! ! XLByteToSeg(recptr, log, seg); ! ! d_seg = wal_keep_segments % XLogSegsPerFile; ! d_log = wal_keep_segments / XLogSegsPerFile; ! if (seg < d_seg) ! { ! d_log += 1; ! seg = seg - d_seg + XLogSegsPerFile; ! } ! else ! seg = seg - d_seg; ! /* avoid underflow, don't go below (0,1) */ ! if (log < d_log || (log == d_log && seg == 0)) ! { ! log = 0; ! seg = 1; ! } ! else ! log = log - d_log; ! ! /* don't delete WAL segments newer than the calculated segment */ ! if (log < _logId || (log == _logId && seg < _logSeg)) ! { ! _logId = log; ! _logSeg = seg; ! } ! } ! PrevLogSeg(_logId, _logSeg); RemoveOldXlogFiles(_logId, _logSeg, recptr); } --- 7886,7892 ---- */ if (_logId || _logSeg) { ! WalKeepSegments(&_logId, &_logSeg, recptr); PrevLogSeg(_logId, _logSeg); RemoveOldXlogFiles(_logId, _logSeg, recptr); } *************** *** 8151,8167 **** CreateRestartPoint(int flags) /* * Delete old log files (those no longer needed even for previous * checkpoint/restartpoint) to prevent the disk holding the xlog from ! * growing full. We don't need do this during normal recovery, but during ! * streaming recovery we have to or the disk will eventually fill up from ! * old log files streamed from master. */ ! if (WalRcvInProgress() && (_logId || _logSeg)) { XLogRecPtr endptr; /* Get the current (or recent) end of xlog */ ! endptr = GetWalRcvWriteRecPtr(NULL); PrevLogSeg(_logId, _logSeg); RemoveOldXlogFiles(_logId, _logSeg, endptr); --- 8127,8142 ---- /* * Delete old log files (those no longer needed even for previous * checkpoint/restartpoint) to prevent the disk holding the xlog from ! * growing full. */ ! if (_logId || _logSeg) { XLogRecPtr endptr; /* Get the current (or recent) end of xlog */ ! endptr = GetStandbyFlushRecPtr(); + WalKeepSegments(&_logId, &_logSeg, endptr); PrevLogSeg(_logId, _logSeg); RemoveOldXlogFiles(_logId, _logSeg, endptr); *************** *** 8207,8212 **** CreateRestartPoint(int flags) --- 8182,8231 ---- } /* + * Calculate the last segment that we need to retain because of + * wal_keep_segments, by subtracting wal_keep_segments from + * the given location. + */ + static void + WalKeepSegments(uint32 *logId, uint32 *logSeg, XLogRecPtr recptr) + { + uint32 log; + uint32 seg; + int d_log; + int d_seg; + + if (wal_keep_segments == 0) + return; + + XLByteToSeg(recptr, log, seg); + + d_seg = wal_keep_segments % XLogSegsPerFile; + d_log = wal_keep_segments / XLogSegsPerFile; + if (seg < d_seg) + { + d_log += 1; + seg = seg - d_seg + XLogSegsPerFile; + } + else + seg = seg - d_seg; + /* avoid underflow, don't go below (0,1) */ + if (log < d_log || (log == d_log && seg == 0)) + { + log = 0; + seg = 1; + } + else + log = log - d_log; + + /* don't delete WAL segments newer than the calculated segment */ + if (log < *logId || (log == *logId && seg < *logSeg)) + { + *logId = log; + *logSeg = seg; + } + } + + /* * Write a NEXTOID log record */ void *************** *** 9549,9558 **** pg_last_xlog_receive_location(PG_FUNCTION_ARGS) /* * Get latest redo apply position. * * Exported to allow WALReceiver to read the pointer directly. */ XLogRecPtr ! GetXLogReplayRecPtr(void) { /* use volatile pointer to prevent code rearrangement */ volatile XLogCtlData *xlogctl = XLogCtl; --- 9568,9581 ---- /* * Get latest redo apply position. * + * Optionally, returns the end byte position of the last restored + * WAL segment. Callers not interested in that value may pass + * NULL for restoreLastRecPtr. + * * Exported to allow WALReceiver to read the pointer directly. */ XLogRecPtr ! GetXLogReplayRecPtr(XLogRecPtr *restoreLastRecPtr) { /* use volatile pointer to prevent code rearrangement */ volatile XLogCtlData *xlogctl = XLogCtl; *************** *** 9560,9571 **** GetXLogReplayRecPtr(void) --- 9583,9616 ---- SpinLockAcquire(&xlogctl->info_lck); recptr = xlogctl->recoveryLastRecPtr; + if (restoreLastRecPtr) + *restoreLastRecPtr = xlogctl->restoreLastRecPtr; SpinLockRelease(&xlogctl->info_lck); return recptr; } /* + * Get current standby flush position, ie, the last WAL position + * known to be fsync'd to disk in standby. + */ + XLogRecPtr + GetStandbyFlushRecPtr(void) + { + XLogRecPtr receivePtr; + XLogRecPtr replayPtr; + XLogRecPtr restorePtr; + + receivePtr = GetWalRcvWriteRecPtr(NULL); + replayPtr = GetXLogReplayRecPtr(&restorePtr); + + if (XLByteLT(receivePtr, replayPtr)) + return XLByteLT(replayPtr, restorePtr) ? restorePtr : replayPtr; + else + return XLByteLT(receivePtr, restorePtr) ? restorePtr : receivePtr; + } + + /* * Report the last WAL replay location (same format as pg_start_backup etc) * * This is useful for determining how much of WAL is visible to read-only *************** *** 9577,9583 **** pg_last_xlog_replay_location(PG_FUNCTION_ARGS) XLogRecPtr recptr; char location[MAXFNAMELEN]; ! recptr = GetXLogReplayRecPtr(); if (recptr.xlogid == 0 && recptr.xrecoff == 0) PG_RETURN_NULL(); --- 9622,9628 ---- XLogRecPtr recptr; char location[MAXFNAMELEN]; ! recptr = GetXLogReplayRecPtr(NULL); if (recptr.xlogid == 0 && recptr.xrecoff == 0) PG_RETURN_NULL(); *** a/src/backend/postmaster/postmaster.c --- b/src/backend/postmaster/postmaster.c *************** *** 2317,2322 **** reaper(SIGNAL_ARGS) --- 2317,2331 ---- pmState = PM_RUN; /* + * Kill the cascading walsender to urge the cascaded standby to + * reread the timeline history file, adjust its timeline and + * establish replication connection again. This is required + * because the timeline of cascading standby is not consistent + * with that of cascaded one just after failover. + */ + SignalSomeChildren(SIGUSR2, BACKEND_TYPE_WALSND); + + /* * Crank up the background writer, if we didn't do that already * when we entered consistent recovery state. It doesn't matter * if this fails, we'll just try again later. *** a/src/backend/replication/basebackup.c --- b/src/backend/replication/basebackup.c *************** *** 339,344 **** SendBaseBackup(BaseBackupCmd *cmd) --- 339,349 ---- MemoryContext old_context; basebackup_options opt; + if (cascading_walsender) + ereport(FATAL, + (errcode(ERRCODE_CANNOT_CONNECT_NOW), + errmsg("recovery is still in progress, can't accept WAL streaming connections for backup"))); + parse_basebackup_options(cmd->options, &opt); backup_context = AllocSetContextCreate(CurrentMemoryContext, *** a/src/backend/replication/syncrep.c --- b/src/backend/replication/syncrep.c *************** *** 469,474 **** SyncRepGetStandbyPriority(void) --- 469,481 ---- int priority = 0; bool found = false; + /* + * Since synchronous cascade replication is not allowed, we always + * set the priority of cascading walsender to zero. + */ + if (cascading_walsender) + return 0; + /* Need a modifiable copy of string */ rawstring = pstrdup(SyncRepStandbyNames); *** a/src/backend/replication/walreceiver.c --- b/src/backend/replication/walreceiver.c *************** *** 44,49 **** --- 44,50 ---- #include "miscadmin.h" #include "replication/walprotocol.h" #include "replication/walreceiver.h" + #include "replication/walsender.h" #include "storage/ipc.h" #include "storage/pmsignal.h" #include "storage/procarray.h" *************** *** 564,571 **** XLogWalRcvFlush(bool dying) } SpinLockRelease(&walrcv->mutex); ! /* Signal the startup process that new WAL has arrived */ WakeupRecovery(); /* Report XLOG streaming progress in PS display */ if (update_process_title) --- 565,574 ---- } SpinLockRelease(&walrcv->mutex); ! /* Signal the startup process and walsender that new WAL has arrived */ WakeupRecovery(); + if (AllowCascadeReplication()) + WalSndWakeup(); /* Report XLOG streaming progress in PS display */ if (update_process_title) *************** *** 625,631 **** XLogWalRcvSendReply(void) /* Construct a new message */ reply_message.write = LogstreamResult.Write; reply_message.flush = LogstreamResult.Flush; ! reply_message.apply = GetXLogReplayRecPtr(); reply_message.sendTime = now; elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X", --- 628,634 ---- /* Construct a new message */ reply_message.write = LogstreamResult.Write; reply_message.flush = LogstreamResult.Flush; ! reply_message.apply = GetXLogReplayRecPtr(NULL); reply_message.sendTime = now; elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X", *** a/src/backend/replication/walsender.c --- b/src/backend/replication/walsender.c *************** *** 48,53 **** --- 48,54 ---- #include "replication/basebackup.h" #include "replication/replnodes.h" #include "replication/walprotocol.h" + #include "replication/walreceiver.h" #include "replication/walsender.h" #include "storage/fd.h" #include "storage/ipc.h" *************** *** 70,75 **** WalSnd *MyWalSnd = NULL; --- 71,77 ---- /* Global state */ bool am_walsender = false; /* Am I a walsender process ? */ + bool cascading_walsender = false; /* Am I cascading WAL to another standby ? */ /* User-settable parameters for walsender */ int max_wal_senders = 0; /* the maximum number of concurrent walsenders */ *************** *** 135,144 **** WalSenderMain(void) { MemoryContext walsnd_context; ! if (RecoveryInProgress()) ! ereport(FATAL, ! (errcode(ERRCODE_CANNOT_CONNECT_NOW), ! errmsg("recovery is still in progress, can't accept WAL streaming connections"))); /* Create a per-walsender data structure in shared memory */ InitWalSnd(); --- 137,143 ---- { MemoryContext walsnd_context; ! cascading_walsender = RecoveryInProgress(); /* Create a per-walsender data structure in shared memory */ InitWalSnd(); *************** *** 165,170 **** WalSenderMain(void) --- 164,175 ---- /* Unblock signals (they were blocked when the postmaster forked us) */ PG_SETMASK(&UnBlockSig); + /* + * Use the recovery target timeline ID during recovery + */ + if (cascading_walsender) + ThisTimeLineID = GetRecoveryTargetTLI(); + /* Tell the standby that walsender is ready for receiving commands */ ReadyForQuery(DestRemote); *************** *** 290,296 **** IdentifySystem(void) GetSystemIdentifier()); snprintf(tli, sizeof(tli), "%u", ThisTimeLineID); ! logptr = GetInsertRecPtr(); snprintf(xpos, sizeof(xpos), "%X/%X", logptr.xlogid, logptr.xrecoff); --- 295,301 ---- GetSystemIdentifier()); snprintf(tli, sizeof(tli), "%u", ThisTimeLineID); ! logptr = cascading_walsender ? GetStandbyFlushRecPtr() : GetInsertRecPtr(); snprintf(xpos, sizeof(xpos), "%X/%X", logptr.xlogid, logptr.xrecoff); *************** *** 372,379 **** StartReplication(StartReplicationCmd *cmd) * directory that was created with 'minimal'. So this is not bulletproof, * the purpose is just to give a user-friendly error message that hints * how to configure the system correctly. */ ! if (wal_level == WAL_LEVEL_MINIMAL) ereport(FATAL, (errcode(ERRCODE_CANNOT_CONNECT_NOW), errmsg("standby connections not allowed because wal_level=minimal"))); --- 377,388 ---- * directory that was created with 'minimal'. So this is not bulletproof, * the purpose is just to give a user-friendly error message that hints * how to configure the system correctly. + * + * NOTE: The existence of cascading walsender means that wal_level is set + * to hot_standby in the master. So we don't need to check the value of + * wal_level during recovery. */ ! if (!cascading_walsender && wal_level == WAL_LEVEL_MINIMAL) ereport(FATAL, (errcode(ERRCODE_CANNOT_CONNECT_NOW), errmsg("standby connections not allowed because wal_level=minimal"))); *************** *** 601,607 **** ProcessStandbyReplyMessage(void) SpinLockRelease(&walsnd->mutex); } ! SyncRepReleaseWaiters(); } /* --- 610,617 ---- SpinLockRelease(&walsnd->mutex); } ! if (!cascading_walsender) ! SyncRepReleaseWaiters(); } /* *************** *** 770,776 **** WalSndLoop(void) --- 780,793 ---- XLogSend(output_message, &caughtup); ProcessRepliesIfAny(); if (caughtup && !pq_is_send_pending()) + { walsender_shutdown_requested = true; + + if (cascading_walsender) + ereport(LOG, + (errmsg("terminating walsender process to request the cascaded " + "standby to update timeline and reconnect"))); + } } if ((caughtup || pq_is_send_pending()) && *************** *** 933,939 **** WalSndKill(int code, Datum arg) } /* ! * Read 'nbytes' bytes from WAL into 'buf', starting at location 'recptr' * * XXX probably this should be improved to suck data directly from the * WAL buffers when possible. --- 950,956 ---- } /* ! * Read 'count' bytes from WAL into 'buf', starting at location 'startptr' * * XXX probably this should be improved to suck data directly from the * WAL buffers when possible. *************** *** 944,958 **** WalSndKill(int code, Datum arg) * more than one. */ void ! XLogRead(char *buf, XLogRecPtr recptr, Size nbytes) { ! XLogRecPtr startRecPtr = recptr; ! char path[MAXPGPATH]; uint32 lastRemovedLog; uint32 lastRemovedSeg; uint32 log; uint32 seg; while (nbytes > 0) { uint32 startoff; --- 961,981 ---- * more than one. */ void ! XLogRead(char *buf, XLogRecPtr startptr, Size count) { ! char *p; ! XLogRecPtr recptr; ! Size nbytes; uint32 lastRemovedLog; uint32 lastRemovedSeg; uint32 log; uint32 seg; + retry: + p = buf; + recptr = startptr; + nbytes = count; + while (nbytes > 0) { uint32 startoff; *************** *** 963,968 **** XLogRead(char *buf, XLogRecPtr recptr, Size nbytes) --- 986,993 ---- if (sendFile < 0 || !XLByteInSeg(recptr, sendId, sendSeg)) { + char path[MAXPGPATH]; + /* Switch to another logfile segment */ if (sendFile >= 0) close(sendFile); *************** *** 1014,1020 **** XLogRead(char *buf, XLogRecPtr recptr, Size nbytes) else segbytes = nbytes; ! readbytes = read(sendFile, buf, segbytes); if (readbytes <= 0) ereport(ERROR, (errcode_for_file_access(), --- 1039,1045 ---- else segbytes = nbytes; ! readbytes = read(sendFile, p, segbytes); if (readbytes <= 0) ereport(ERROR, (errcode_for_file_access(), *************** *** 1027,1033 **** XLogRead(char *buf, XLogRecPtr recptr, Size nbytes) sendOff += readbytes; nbytes -= readbytes; ! buf += readbytes; } /* --- 1052,1058 ---- sendOff += readbytes; nbytes -= readbytes; ! p += readbytes; } /* *************** *** 1038,1044 **** XLogRead(char *buf, XLogRecPtr recptr, Size nbytes) * already have been overwritten with new WAL records. */ XLogGetLastRemoved(&lastRemovedLog, &lastRemovedSeg); ! XLByteToSeg(startRecPtr, log, seg); if (log < lastRemovedLog || (log == lastRemovedLog && seg <= lastRemovedSeg)) { --- 1063,1069 ---- * already have been overwritten with new WAL records. */ XLogGetLastRemoved(&lastRemovedLog, &lastRemovedSeg); ! XLByteToSeg(startptr, log, seg); if (log < lastRemovedLog || (log == lastRemovedLog && seg <= lastRemovedSeg)) { *************** *** 1050,1055 **** XLogRead(char *buf, XLogRecPtr recptr, Size nbytes) --- 1075,1106 ---- errmsg("requested WAL segment %s has already been removed", filename))); } + + /* + * During recovery, the currently-open WAL file might be replaced with + * the file of the same name retrieved from archive. So we always need + * to check what we read was valid after reading into the buffer. If it's + * invalid, we try to open and read the file again. + */ + if (cascading_walsender) + { + /* use volatile pointer to prevent code rearrangement */ + volatile WalSnd *walsnd = MyWalSnd; + bool reload; + + SpinLockAcquire(&walsnd->mutex); + reload = walsnd->reload; + walsnd->reload = false; + SpinLockRelease(&walsnd->mutex); + + if (reload && sendFile >= 0) + { + close(sendFile); + sendFile = -1; + + goto retry; + } + } } /* *************** *** 1082,1088 **** XLogSend(char *msgbuf, bool *caughtup) * subsequently crashes and restarts, slaves must not have applied any WAL * that gets lost on the master. */ ! SendRqstPtr = GetFlushRecPtr(); /* Quick exit if nothing to do */ if (XLByteLE(SendRqstPtr, sentPtr)) --- 1133,1139 ---- * subsequently crashes and restarts, slaves must not have applied any WAL * that gets lost on the master. */ ! SendRqstPtr = cascading_walsender ? GetStandbyFlushRecPtr() : GetFlushRecPtr(); /* Quick exit if nothing to do */ if (XLByteLE(SendRqstPtr, sentPtr)) *************** *** 1187,1192 **** XLogSend(char *msgbuf, bool *caughtup) --- 1238,1265 ---- return; } + /* + * Request walsenders to reload the currently-open WAL file + */ + void + WalSndRqstFileReload(void) + { + int i; + + for (i = 0; i < max_wal_senders; i++) + { + /* use volatile pointer to prevent code rearrangement */ + volatile WalSnd *walsnd = &WalSndCtl->walsnds[i]; + + if (walsnd->pid == 0) + continue; + + SpinLockAcquire(&walsnd->mutex); + walsnd->reload = true; + SpinLockRelease(&walsnd->mutex); + } + } + /* SIGHUP: set flag to re-read config file at next convenient time */ static void WalSndSigHupHandler(SIGNAL_ARGS) *** a/src/include/access/xlog.h --- b/src/include/access/xlog.h *************** *** 221,226 **** extern int wal_level; --- 221,229 ---- /* Do we need to WAL-log information required only for Hot Standby? */ #define XLogStandbyInfoActive() (wal_level >= WAL_LEVEL_HOT_STANDBY) + /* Can we allow the standby to accept replication connection from another standby? */ + #define AllowCascadeReplication() (EnableHotStandby && max_wal_senders > 0) + #ifdef WAL_DEBUG extern bool XLOG_DEBUG; #endif *************** *** 292,298 **** extern bool RecoveryInProgress(void); extern bool HotStandbyActive(void); extern bool XLogInsertAllowed(void); extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream); ! extern XLogRecPtr GetXLogReplayRecPtr(void); extern void UpdateControlFile(void); extern uint64 GetSystemIdentifier(void); --- 295,302 ---- extern bool HotStandbyActive(void); extern bool XLogInsertAllowed(void); extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream); ! extern XLogRecPtr GetXLogReplayRecPtr(XLogRecPtr *restoreLastRecPtr); ! extern XLogRecPtr GetStandbyFlushRecPtr(void); extern void UpdateControlFile(void); extern uint64 GetSystemIdentifier(void); *** a/src/include/replication/walsender.h --- b/src/include/replication/walsender.h *************** *** 35,40 **** typedef struct WalSnd --- 35,41 ---- pid_t pid; /* this walsender's process id, or 0 */ WalSndState state; /* this walsender's state */ XLogRecPtr sentPtr; /* WAL has been sent up to this point */ + bool reload; /* does currently-open file need to be reloaded? */ /* * The xlog locations that have been written, flushed, and applied by *************** *** 92,97 **** extern WalSndCtlData *WalSndCtl; --- 93,99 ---- /* global state */ extern bool am_walsender; + extern bool cascading_walsender; extern volatile sig_atomic_t walsender_shutdown_requested; extern volatile sig_atomic_t walsender_ready_to_stop; *************** *** 106,112 **** extern Size WalSndShmemSize(void); extern void WalSndShmemInit(void); extern void WalSndWakeup(void); extern void WalSndSetState(WalSndState state); ! extern void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes); extern Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS); --- 108,115 ---- extern void WalSndShmemInit(void); extern void WalSndWakeup(void); extern void WalSndSetState(WalSndState state); ! extern void XLogRead(char *buf, XLogRecPtr startptr, Size count); ! extern void WalSndRqstFileReload(void); extern Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS);
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers