On Mon, Jul 4, 2011 at 6:24 PM, Simon Riggs <si...@2ndquadrant.com> wrote: > On Tue, Jun 14, 2011 at 6:08 AM, Fujii Masao <masao.fu...@gmail.com> wrote: > >>> The standby must not accept replication connection from that standby itself. >>> Otherwise, since any new WAL data would not appear in that standby, >>> replication cannot advance any more. As a safeguard against this, I >>> introduced >>> new ID to identify each instance. The walsender sends that ID as the fourth >>> field of the reply of IDENTIFY_SYSTEM, and then walreceiver checks whether >>> the IDs are the same between two servers. If they are the same, which means >>> that the standby is just connecting to that standby itself, so walreceiver >>> emits ERROR. > > Thanks for waiting for review.
Thanks for the review! > This part of the patch is troubling me. I think you have identified an > important problem, but this solution doesn't work fully. > > If we allow standbys to connect to other standbys then we have > problems with standbys not being connected to master. This can occur > with a 1-step connection, as you point out, but it could also occur > with a 2-step, 3-step or more connection, where a circle of standbys > are all depending upon each other. Your solution only works for 1-step > connections. "Solving" that problem in a general sense might be more > dangerous than leaving it alone. I think we should think some more > about the issues there and approach them as a separate problem. > > I think we should remove that and just focus on the main problem, for > now. That will make it a simpler patch and easier to commit. I agree to focus on the main problem first. I removed that. Attached is the updated version. Regards, -- Fujii Masao NIPPON TELEGRAPH AND TELEPHONE CORPORATION NTT Open Source Software Center
*** a/doc/src/sgml/config.sgml --- b/doc/src/sgml/config.sgml *************** *** 1998,2004 **** SET ENABLE_SEQSCAN TO OFF; doesn't keep any extra segments for standby purposes, and the number of old WAL segments available to standby servers is a function of the location of the previous checkpoint and status of WAL ! archiving. This parameter has no effect on restartpoints. This parameter can only be set in the <filename>postgresql.conf</> file or on the server command line. </para> --- 1998,2004 ---- doesn't keep any extra segments for standby purposes, and the number of old WAL segments available to standby servers is a function of the location of the previous checkpoint and status of WAL ! archiving. This parameter can only be set in the <filename>postgresql.conf</> file or on the server command line. </para> *************** *** 2105,2111 **** SET ENABLE_SEQSCAN TO OFF; synchronous replication is enabled, individual transactions can be configured not to wait for replication by setting the <xref linkend="guc-synchronous-commit"> parameter to ! <literal>local</> or <literal>off</>. </para> <para> This parameter can only be set in the <filename>postgresql.conf</> --- 2105,2112 ---- synchronous replication is enabled, individual transactions can be configured not to wait for replication by setting the <xref linkend="guc-synchronous-commit"> parameter to ! <literal>local</> or <literal>off</>. This parameter has no effect on ! cascade replication. </para> <para> This parameter can only be set in the <filename>postgresql.conf</> *** a/doc/src/sgml/high-availability.sgml --- b/doc/src/sgml/high-availability.sgml *************** *** 877,884 **** primary_conninfo = 'host=192.168.1.50 port=5432 user=foo password=foopass' --- 877,921 ---- network delay, or that the standby is under heavy load. </para> </sect3> + </sect2> + + <sect2 id="cascade-replication"> + <title>Cascade Replication</title> + <indexterm zone="high-availability"> + <primary>Cascade Replication</primary> + </indexterm> + <para> + Cascade replication feature allows the standby to accept the replication + connections and stream WAL records to another standbys. This is useful + for reducing the number of standbys connecting to the master and reducing + the overhead of the master, when you have many standbys. + </para> + <para> + The cascading standby sends not only WAL records received from the + master but also those restored from the archive. So even if the replication + connection in higher level is terminated, you can continue cascade replication. + </para> + <para> + Cascade replication is asynchronous. Note that synchronous replication + (see <xref linkend="synchronous-replication">) has no effect on cascade + replication. + </para> + <para> + Promoting the cascading standby terminates all the cascade replication + connections which it uses. This is because the timeline becomes different + between standbys, and they cannot continue replication any more. + </para> + <para> + To use cascade replication, set up the cascading standby so that it can + accept the replication connections, i.e., set <varname>max_wal_senders</>, + <varname>hot_standby</> and authentication option (see + <xref linkend="streaming-replication"> and <xref linkend="hot-standby">). + Also set <varname>primary_conninfo</> in the cascaded standby to point + to the cascading standby. + </para> </sect2> + <sect2 id="synchronous-replication"> <title>Synchronous Replication</title> *** a/src/backend/access/transam/xlog.c --- b/src/backend/access/transam/xlog.c *************** *** 446,451 **** typedef struct XLogCtlData --- 446,453 ---- XLogRecPtr recoveryLastRecPtr; /* timestamp of last COMMIT/ABORT record replayed (or being replayed) */ TimestampTz recoveryLastXTime; + /* end of the last record restored from the archive */ + XLogRecPtr restoreLastRecPtr; /* Are we requested to pause recovery? */ bool recoveryPause; *************** *** 618,625 **** static bool XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites, static bool AdvanceXLInsertBuffer(bool new_segment); static bool XLogCheckpointNeeded(uint32 logid, uint32 logseg); static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch); ! static bool InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath, ! bool find_free, int *max_advance, bool use_lock); static int XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli, int source, bool notexistOk); --- 620,629 ---- static bool AdvanceXLInsertBuffer(bool new_segment); static bool XLogCheckpointNeeded(uint32 logid, uint32 logseg); static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch); ! static void XLogFileCopy(TimeLineID tli, uint32 log, uint32 seg, char *srcpath, ! uint32 offset); ! static bool InstallXLogFileSegment(TimeLineID tli, uint32 *log, uint32 *seg, ! char *tmppath, bool find_free, int *max_advance, bool use_lock); static int XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli, int source, bool notexistOk); *************** *** 1742,1748 **** XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch) /* create/use new log file */ use_existent = true; ! openLogFile = XLogFileInit(openLogId, openLogSeg, &use_existent, true); openLogOff = 0; } --- 1746,1752 ---- /* create/use new log file */ use_existent = true; ! openLogFile = XLogFileInit(ThisTimeLineID, openLogId, openLogSeg, &use_existent, true); openLogOff = 0; } *************** *** 2304,2310 **** XLogNeedsFlush(XLogRecPtr record) /* * Create a new XLOG file segment, or open a pre-existing one. * ! * log, seg: identify segment to be created/opened. * * *use_existent: if TRUE, OK to use a pre-existing file (else, any * pre-existing file will be deleted). On return, TRUE if a pre-existing --- 2308,2314 ---- /* * Create a new XLOG file segment, or open a pre-existing one. * ! * tli, log, seg: identify segment to be created/opened. * * *use_existent: if TRUE, OK to use a pre-existing file (else, any * pre-existing file will be deleted). On return, TRUE if a pre-existing *************** *** 2322,2328 **** XLogNeedsFlush(XLogRecPtr record) * in a critical section. */ int ! XLogFileInit(uint32 log, uint32 seg, bool *use_existent, bool use_lock) { char path[MAXPGPATH]; --- 2326,2332 ---- * in a critical section. */ int ! XLogFileInit(TimeLineID tli, uint32 log, uint32 seg, bool *use_existent, bool use_lock) { char path[MAXPGPATH]; *************** *** 2334,2340 **** XLogFileInit(uint32 log, uint32 seg, int fd; int nbytes; ! XLogFilePath(path, ThisTimeLineID, log, seg); /* * Try to use existent file (checkpoint maker may have created it already) --- 2338,2344 ---- int fd; int nbytes; ! XLogFilePath(path, tli, log, seg); /* * Try to use existent file (checkpoint maker may have created it already) *************** *** 2431,2437 **** XLogFileInit(uint32 log, uint32 seg, installed_log = log; installed_seg = seg; max_advance = XLOGfileslop; ! if (!InstallXLogFileSegment(&installed_log, &installed_seg, tmppath, *use_existent, &max_advance, use_lock)) { --- 2435,2441 ---- installed_log = log; installed_seg = seg; max_advance = XLOGfileslop; ! if (!InstallXLogFileSegment(tli, &installed_log, &installed_seg, tmppath, *use_existent, &max_advance, use_lock)) { *************** *** 2463,2564 **** XLogFileInit(uint32 log, uint32 seg, /* * Create a new XLOG file segment by copying a pre-existing one. * ! * log, seg: identify segment to be created. * ! * srcTLI, srclog, srcseg: identify segment to be copied (could be from ! * a different timeline) * ! * Currently this is only used during recovery, and so there are no locking ! * considerations. But we should be just as tense as XLogFileInit to avoid ! * emplacing a bogus file. */ static void ! XLogFileCopy(uint32 log, uint32 seg, ! TimeLineID srcTLI, uint32 srclog, uint32 srcseg) { - char path[MAXPGPATH]; - char tmppath[MAXPGPATH]; char buffer[XLOG_BLCKSZ]; ! int srcfd; ! int fd; ! int nbytes; /* * Open the source file */ ! XLogFilePath(path, srcTLI, srclog, srcseg); ! srcfd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0); if (srcfd < 0) ereport(ERROR, (errcode_for_file_access(), ! errmsg("could not open file \"%s\": %m", path))); ! /* ! * Copy into a temp file name. ! */ ! snprintf(tmppath, MAXPGPATH, XLOGDIR "/xlogtemp.%d", (int) getpid()); ! unlink(tmppath); ! /* do not use get_sync_bit() here --- want to fsync only at end of fill */ ! fd = BasicOpenFile(tmppath, O_RDWR | O_CREAT | O_EXCL | PG_BINARY, ! S_IRUSR | S_IWUSR); ! if (fd < 0) ! ereport(ERROR, ! (errcode_for_file_access(), ! errmsg("could not create file \"%s\": %m", tmppath))); /* * Do the data copying. */ ! for (nbytes = 0; nbytes < XLogSegSize; nbytes += sizeof(buffer)) { ! errno = 0; ! if ((int) read(srcfd, buffer, sizeof(buffer)) != (int) sizeof(buffer)) ! { ! if (errno != 0) ! ereport(ERROR, ! (errcode_for_file_access(), ! errmsg("could not read file \"%s\": %m", path))); ! else ! ereport(ERROR, ! (errmsg("not enough data in file \"%s\"", path))); ! } ! errno = 0; ! if ((int) write(fd, buffer, sizeof(buffer)) != (int) sizeof(buffer)) ! { ! int save_errno = errno; ! /* ! * If we fail to make the file, delete it to release disk space ! */ ! unlink(tmppath); ! /* if write didn't set errno, assume problem is no disk space */ ! errno = save_errno ? save_errno : ENOSPC; ereport(ERROR, (errcode_for_file_access(), ! errmsg("could not write to file \"%s\": %m", tmppath))); } } ! if (pg_fsync(fd) != 0) ! ereport(ERROR, ! (errcode_for_file_access(), ! errmsg("could not fsync file \"%s\": %m", tmppath))); if (close(fd)) ereport(ERROR, (errcode_for_file_access(), ! errmsg("could not close file \"%s\": %m", tmppath))); close(srcfd); - - /* - * Now move the segment into place with its final name. - */ - if (!InstallXLogFileSegment(&log, &seg, tmppath, false, NULL, false)) - elog(ERROR, "InstallXLogFileSegment should not have failed"); } /* --- 2467,2557 ---- /* * Create a new XLOG file segment by copying a pre-existing one. * ! * tli, log, seg: identify segment to be created. * ! * srcpath: identify segment to be copied. * ! * offset: identify offset to start copying from. */ static void ! XLogFileCopy(TimeLineID tli, uint32 log, uint32 seg, char *srcpath, uint32 offset) { char buffer[XLOG_BLCKSZ]; ! int srcfd; ! int fd; ! bool use_existent; /* * Open the source file */ ! srcfd = BasicOpenFile(srcpath, O_RDONLY | PG_BINARY, 0); if (srcfd < 0) ereport(ERROR, (errcode_for_file_access(), ! errmsg("could not open file \"%s\": %m", srcpath))); ! /* Create/use new log file */ ! use_existent = true; ! fd = XLogFileInit(tli, log, seg, &use_existent, true); ! /* Need to seek in the file? */ ! if (offset != 0) ! { ! if (lseek(srcfd, (off_t) offset, SEEK_SET) < 0) ! ereport(ERROR, ! (errcode_for_file_access(), ! errmsg("could not seek in log file \"%s\": %m", ! srcpath))); ! if (lseek(fd, (off_t) offset, SEEK_SET) < 0) ! ereport(ERROR, ! (errcode_for_file_access(), ! errmsg("could not seek in log file %u, segment %u to offset %u: %m", ! log, seg, offset))); ! } /* * Do the data copying. */ ! while (offset < XLogSegSize) { ! int nbytes; ! nbytes = read(srcfd, buffer, sizeof(buffer)); ! if (nbytes <= 0) ! ereport(ERROR, ! (errcode_for_file_access(), ! errmsg("could not read from log file \"%s\": %m", ! srcpath))); + /* OK to write the logs */ + errno = 0; + if (write(fd, buffer, nbytes) != nbytes) + { + /* if write didn't set errno, assume no disk space */ + if (errno == 0) + errno = ENOSPC; ereport(ERROR, (errcode_for_file_access(), ! errmsg("could not write to log file %u, segment %u " ! "at offset %u, length %lu: %m", ! log, seg, offset, (unsigned long) nbytes))); } + + /* Update state for copy */ + offset += nbytes; } ! /* Issue appropriate kind of fsync */ ! issue_xlog_fsync(fd, log, seg); if (close(fd)) ereport(ERROR, (errcode_for_file_access(), ! errmsg("could not close log file %u, segment %u: %m", ! log, seg))); close(srcfd); } /* *************** *** 2567,2573 **** XLogFileCopy(uint32 log, uint32 seg, * This is used both to install a newly-created segment (which has a temp * filename while it's being created) and to recycle an old segment. * ! * *log, *seg: identify segment to install as (or first possible target). * When find_free is TRUE, these are modified on return to indicate the * actual installation location or last segment searched. * --- 2560,2566 ---- * This is used both to install a newly-created segment (which has a temp * filename while it's being created) and to recycle an old segment. * ! * tli, *log, *seg: identify segment to install as (or first possible target). * When find_free is TRUE, these are modified on return to indicate the * actual installation location or last segment searched. * *************** *** 2591,2604 **** XLogFileCopy(uint32 log, uint32 seg, * file into place. */ static bool ! InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath, bool find_free, int *max_advance, bool use_lock) { char path[MAXPGPATH]; struct stat stat_buf; ! XLogFilePath(path, ThisTimeLineID, *log, *seg); /* * We want to be sure that only one process does this at a time. --- 2584,2597 ---- * file into place. */ static bool ! InstallXLogFileSegment(TimeLineID tli, uint32 *log, uint32 *seg, char *tmppath, bool find_free, int *max_advance, bool use_lock) { char path[MAXPGPATH]; struct stat stat_buf; ! XLogFilePath(path, tli, *log, *seg); /* * We want to be sure that only one process does this at a time. *************** *** 2625,2631 **** InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath, } NextLogSeg(*log, *seg); (*max_advance)--; ! XLogFilePath(path, ThisTimeLineID, *log, *seg); } } --- 2618,2624 ---- } NextLogSeg(*log, *seg); (*max_advance)--; ! XLogFilePath(path, tli, *log, *seg); } } *************** *** 2729,2734 **** XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli, --- 2722,2757 ---- elog(ERROR, "invalid XLogFileRead source %d", source); } + /* + * If cascade replication is allowed, and we've just restored an archived + * WAL file to temporary file, we copy it to the WAL file with correct + * name, so that cascading walsenders can treat it. + */ + if (source == XLOG_FROM_ARCHIVE && AllowCascadeReplication()) + { + /* use volatile pointer to prevent code rearrangement */ + volatile XLogCtlData *xlogctl = XLogCtl; + XLogRecPtr endptr; + + XLogFileCopy(curFileTLI, log, seg, path, readOff); + + /* + * Calculate the end location of the restored WAL file and save it in + * shmem. It's used as current standby flush position, and cascading + * walsenders try to send WAL records up to this location. + */ + endptr.xlogid = log; + endptr.xrecoff = seg * XLogSegSize; + XLByteAdvance(endptr, XLogSegSize); + + SpinLockAcquire(&xlogctl->info_lck); + xlogctl->restoreLastRecPtr = endptr; + SpinLockRelease(&xlogctl->info_lck); + + /* Signal walsender that new WAL has arrived */ + WalSndWakeup(); + } + fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0); if (fd >= 0) { *************** *** 3255,3261 **** PreallocXlogFiles(XLogRecPtr endptr) { NextLogSeg(_logId, _logSeg); use_existent = true; ! lf = XLogFileInit(_logId, _logSeg, &use_existent, true); close(lf); if (!use_existent) CheckpointStats.ckpt_segs_added++; --- 3278,3284 ---- { NextLogSeg(_logId, _logSeg); use_existent = true; ! lf = XLogFileInit(ThisTimeLineID, _logId, _logSeg, &use_existent, true); close(lf); if (!use_existent) CheckpointStats.ckpt_segs_added++; *************** *** 3386,3392 **** RemoveOldXlogFiles(uint32 log, uint32 seg, XLogRecPtr endptr) * separate archive directory. */ if (lstat(path, &statbuf) == 0 && S_ISREG(statbuf.st_mode) && ! InstallXLogFileSegment(&endlogId, &endlogSeg, path, true, &max_advance, true)) { ereport(DEBUG2, --- 3409,3415 ---- * separate archive directory. */ if (lstat(path, &statbuf) == 0 && S_ISREG(statbuf.st_mode) && ! InstallXLogFileSegment(ThisTimeLineID, &endlogId, &endlogSeg, path, true, &max_advance, true)) { ereport(DEBUG2, *************** *** 5153,5159 **** BootStrapXLOG(void) /* Create first XLOG segment file */ use_existent = false; ! openLogFile = XLogFileInit(0, 1, &use_existent, false); /* Write the first page with the initial record */ errno = 0; --- 5176,5182 ---- /* Create first XLOG segment file */ use_existent = false; ! openLogFile = XLogFileInit(ThisTimeLineID, 0, 1, &use_existent, false); /* Write the first page with the initial record */ errno = 0; *************** *** 5532,5539 **** exitArchiveRecovery(TimeLineID endTLI, uint32 endLogId, uint32 endLogSeg) */ if (endTLI != ThisTimeLineID) { ! XLogFileCopy(endLogId, endLogSeg, ! endTLI, endLogId, endLogSeg); if (XLogArchivingActive()) { --- 5555,5563 ---- */ if (endTLI != ThisTimeLineID) { ! XLogFilePath(xlogpath, endTLI, endLogId, endLogSeg); ! XLogFileCopy(ThisTimeLineID, endLogId, endLogSeg, ! xlogpath, 0); if (XLogArchivingActive()) { *************** *** 8162,8167 **** CreateRestartPoint(int flags) --- 8186,8231 ---- /* Get the current (or recent) end of xlog */ endptr = GetWalRcvWriteRecPtr(NULL); + /* + * Calculate the last segment that we need to retain because of + * wal_keep_segments, by subtracting wal_keep_segments from + * current end of xlog. + */ + if (wal_keep_segments > 0) + { + uint32 log; + uint32 seg; + int d_log; + int d_seg; + + XLByteToSeg(endptr, log, seg); + + d_seg = wal_keep_segments % XLogSegsPerFile; + d_log = wal_keep_segments / XLogSegsPerFile; + if (seg < d_seg) + { + d_log += 1; + seg = seg - d_seg + XLogSegsPerFile; + } + else + seg = seg - d_seg; + /* avoid underflow, don't go below (0,1) */ + if (log < d_log || (log == d_log && seg == 0)) + { + log = 0; + seg = 1; + } + else + log = log - d_log; + + /* don't delete WAL segments newer than the calculated segment */ + if (log < _logId || (log == _logId && seg < _logSeg)) + { + _logId = log; + _logSeg = seg; + } + } + PrevLogSeg(_logId, _logSeg); RemoveOldXlogFiles(_logId, _logSeg, endptr); *************** *** 9549,9558 **** pg_last_xlog_receive_location(PG_FUNCTION_ARGS) /* * Get latest redo apply position. * * Exported to allow WALReceiver to read the pointer directly. */ XLogRecPtr ! GetXLogReplayRecPtr(void) { /* use volatile pointer to prevent code rearrangement */ volatile XLogCtlData *xlogctl = XLogCtl; --- 9613,9626 ---- /* * Get latest redo apply position. * + * Optionally, returns the end byte position of the last restored + * WAL segment. Callers not interested in that value may pass + * NULL for restoreLastRecPtr. + * * Exported to allow WALReceiver to read the pointer directly. */ XLogRecPtr ! GetXLogReplayRecPtr(XLogRecPtr *restoreLastRecPtr) { /* use volatile pointer to prevent code rearrangement */ volatile XLogCtlData *xlogctl = XLogCtl; *************** *** 9560,9571 **** GetXLogReplayRecPtr(void) --- 9628,9661 ---- SpinLockAcquire(&xlogctl->info_lck); recptr = xlogctl->recoveryLastRecPtr; + if (restoreLastRecPtr) + *restoreLastRecPtr = xlogctl->restoreLastRecPtr; SpinLockRelease(&xlogctl->info_lck); return recptr; } /* + * Get current standby flush position, ie, the last WAL position + * known to be fsync'd to disk in standby. + */ + XLogRecPtr + GetStandbyFlushRecPtr(void) + { + XLogRecPtr receivePtr; + XLogRecPtr replayPtr; + XLogRecPtr restorePtr; + + receivePtr = GetWalRcvWriteRecPtr(NULL); + replayPtr = GetXLogReplayRecPtr(&restorePtr); + + if (XLByteLT(receivePtr, replayPtr)) + return XLByteLT(replayPtr, restorePtr) ? restorePtr : replayPtr; + else + return XLByteLT(receivePtr, restorePtr) ? restorePtr : receivePtr; + } + + /* * Report the last WAL replay location (same format as pg_start_backup etc) * * This is useful for determining how much of WAL is visible to read-only *************** *** 9577,9583 **** pg_last_xlog_replay_location(PG_FUNCTION_ARGS) XLogRecPtr recptr; char location[MAXFNAMELEN]; ! recptr = GetXLogReplayRecPtr(); if (recptr.xlogid == 0 && recptr.xrecoff == 0) PG_RETURN_NULL(); --- 9667,9673 ---- XLogRecPtr recptr; char location[MAXFNAMELEN]; ! recptr = GetXLogReplayRecPtr(NULL); if (recptr.xlogid == 0 && recptr.xrecoff == 0) PG_RETURN_NULL(); *************** *** 10066,10071 **** XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt, --- 10156,10162 ---- } XLByteToSeg(*RecPtr, readId, readSeg); + readOff = RecPtr->xrecoff % XLogSegSize; retry: /* See if we need to retrieve more data */ *** a/src/backend/postmaster/postmaster.c --- b/src/backend/postmaster/postmaster.c *************** *** 2304,2309 **** reaper(SIGNAL_ARGS) --- 2304,2318 ---- pmState = PM_RUN; /* + * Kill the cascading walsender to urge the cascaded standby to + * reread the timeline history file, adjust its timeline and + * establish replication connection again. This is required + * because the timeline of cascading standby is not consistent + * with that of cascaded one just after failover. + */ + SignalSomeChildren(SIGUSR2, BACKEND_TYPE_WALSND); + + /* * Crank up the background writer, if we didn't do that already * when we entered consistent recovery state. It doesn't matter * if this fails, we'll just try again later. *** a/src/backend/replication/basebackup.c --- b/src/backend/replication/basebackup.c *************** *** 339,344 **** SendBaseBackup(BaseBackupCmd *cmd) --- 339,349 ---- MemoryContext old_context; basebackup_options opt; + if (cascading_walsender) + ereport(FATAL, + (errcode(ERRCODE_CANNOT_CONNECT_NOW), + errmsg("recovery is still in progress, can't accept WAL streaming connections for backup"))); + parse_basebackup_options(cmd->options, &opt); backup_context = AllocSetContextCreate(CurrentMemoryContext, *** a/src/backend/replication/syncrep.c --- b/src/backend/replication/syncrep.c *************** *** 469,474 **** SyncRepGetStandbyPriority(void) --- 469,481 ---- int priority = 0; bool found = false; + /* + * Since synchronous cascade replication is not allowed, we always + * set the priority of cascading walsender to zero. + */ + if (cascading_walsender) + return 0; + /* Need a modifiable copy of string */ rawstring = pstrdup(SyncRepStandbyNames); *** a/src/backend/replication/walreceiver.c --- b/src/backend/replication/walreceiver.c *************** *** 44,49 **** --- 44,50 ---- #include "miscadmin.h" #include "replication/walprotocol.h" #include "replication/walreceiver.h" + #include "replication/walsender.h" #include "storage/ipc.h" #include "storage/pmsignal.h" #include "storage/procarray.h" *************** *** 485,491 **** XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) /* Create/use new log file */ XLByteToSeg(recptr, recvId, recvSeg); use_existent = true; ! recvFile = XLogFileInit(recvId, recvSeg, &use_existent, true); recvOff = 0; } --- 486,492 ---- /* Create/use new log file */ XLByteToSeg(recptr, recvId, recvSeg); use_existent = true; ! recvFile = XLogFileInit(ThisTimeLineID, recvId, recvSeg, &use_existent, true); recvOff = 0; } *************** *** 564,571 **** XLogWalRcvFlush(bool dying) } SpinLockRelease(&walrcv->mutex); ! /* Signal the startup process that new WAL has arrived */ WakeupRecovery(); /* Report XLOG streaming progress in PS display */ if (update_process_title) --- 565,574 ---- } SpinLockRelease(&walrcv->mutex); ! /* Signal the startup process and walsender that new WAL has arrived */ WakeupRecovery(); + if (AllowCascadeReplication()) + WalSndWakeup(); /* Report XLOG streaming progress in PS display */ if (update_process_title) *************** *** 625,631 **** XLogWalRcvSendReply(void) /* Construct a new message */ reply_message.write = LogstreamResult.Write; reply_message.flush = LogstreamResult.Flush; ! reply_message.apply = GetXLogReplayRecPtr(); reply_message.sendTime = now; elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X", --- 628,634 ---- /* Construct a new message */ reply_message.write = LogstreamResult.Write; reply_message.flush = LogstreamResult.Flush; ! reply_message.apply = GetXLogReplayRecPtr(NULL); reply_message.sendTime = now; elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X", *** a/src/backend/replication/walsender.c --- b/src/backend/replication/walsender.c *************** *** 48,53 **** --- 48,54 ---- #include "replication/basebackup.h" #include "replication/replnodes.h" #include "replication/walprotocol.h" + #include "replication/walreceiver.h" #include "replication/walsender.h" #include "storage/fd.h" #include "storage/ipc.h" *************** *** 70,75 **** WalSnd *MyWalSnd = NULL; --- 71,77 ---- /* Global state */ bool am_walsender = false; /* Am I a walsender process ? */ + bool cascading_walsender = false; /* Am I cascading WAL to another standby ? */ /* User-settable parameters for walsender */ int max_wal_senders = 0; /* the maximum number of concurrent walsenders */ *************** *** 135,144 **** WalSenderMain(void) { MemoryContext walsnd_context; ! if (RecoveryInProgress()) ! ereport(FATAL, ! (errcode(ERRCODE_CANNOT_CONNECT_NOW), ! errmsg("recovery is still in progress, can't accept WAL streaming connections"))); /* Create a per-walsender data structure in shared memory */ InitWalSnd(); --- 137,143 ---- { MemoryContext walsnd_context; ! cascading_walsender = RecoveryInProgress(); /* Create a per-walsender data structure in shared memory */ InitWalSnd(); *************** *** 165,170 **** WalSenderMain(void) --- 164,175 ---- /* Unblock signals (they were blocked when the postmaster forked us) */ PG_SETMASK(&UnBlockSig); + /* + * Use the recovery target timeline ID during recovery + */ + if (cascading_walsender) + ThisTimeLineID = GetRecoveryTargetTLI(); + /* Tell the standby that walsender is ready for receiving commands */ ReadyForQuery(DestRemote); *************** *** 290,296 **** IdentifySystem(void) GetSystemIdentifier()); snprintf(tli, sizeof(tli), "%u", ThisTimeLineID); ! logptr = GetInsertRecPtr(); snprintf(xpos, sizeof(xpos), "%X/%X", logptr.xlogid, logptr.xrecoff); --- 295,301 ---- GetSystemIdentifier()); snprintf(tli, sizeof(tli), "%u", ThisTimeLineID); ! logptr = cascading_walsender ? GetStandbyFlushRecPtr() : GetInsertRecPtr(); snprintf(xpos, sizeof(xpos), "%X/%X", logptr.xlogid, logptr.xrecoff); *************** *** 372,379 **** StartReplication(StartReplicationCmd *cmd) * directory that was created with 'minimal'. So this is not bulletproof, * the purpose is just to give a user-friendly error message that hints * how to configure the system correctly. */ ! if (wal_level == WAL_LEVEL_MINIMAL) ereport(FATAL, (errcode(ERRCODE_CANNOT_CONNECT_NOW), errmsg("standby connections not allowed because wal_level=minimal"))); --- 377,388 ---- * directory that was created with 'minimal'. So this is not bulletproof, * the purpose is just to give a user-friendly error message that hints * how to configure the system correctly. + * + * NOTE: The existence of cascading walsender means that wal_level is set + * to hot_standby in the master. So we don't need to check the value of + * wal_level during recovery. */ ! if (!cascading_walsender && wal_level == WAL_LEVEL_MINIMAL) ereport(FATAL, (errcode(ERRCODE_CANNOT_CONNECT_NOW), errmsg("standby connections not allowed because wal_level=minimal"))); *************** *** 601,607 **** ProcessStandbyReplyMessage(void) SpinLockRelease(&walsnd->mutex); } ! SyncRepReleaseWaiters(); } /* --- 610,617 ---- SpinLockRelease(&walsnd->mutex); } ! if (!cascading_walsender) ! SyncRepReleaseWaiters(); } /* *************** *** 1079,1085 **** XLogSend(char *msgbuf, bool *caughtup) * subsequently crashes and restarts, slaves must not have applied any WAL * that gets lost on the master. */ ! SendRqstPtr = GetFlushRecPtr(); /* Quick exit if nothing to do */ if (XLByteLE(SendRqstPtr, sentPtr)) --- 1089,1095 ---- * subsequently crashes and restarts, slaves must not have applied any WAL * that gets lost on the master. */ ! SendRqstPtr = cascading_walsender ? GetStandbyFlushRecPtr() : GetFlushRecPtr(); /* Quick exit if nothing to do */ if (XLByteLE(SendRqstPtr, sentPtr)) *** a/src/include/access/xlog.h --- b/src/include/access/xlog.h *************** *** 221,226 **** extern int wal_level; --- 221,229 ---- /* Do we need to WAL-log information required only for Hot Standby? */ #define XLogStandbyInfoActive() (wal_level >= WAL_LEVEL_HOT_STANDBY) + /* Can we allow the standby to accept replication connection from another standby? */ + #define AllowCascadeReplication() (EnableHotStandby && max_wal_senders > 0) + #ifdef WAL_DEBUG extern bool XLOG_DEBUG; #endif *************** *** 273,279 **** extern XLogRecPtr XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata); extern void XLogFlush(XLogRecPtr RecPtr); extern void XLogBackgroundFlush(void); extern bool XLogNeedsFlush(XLogRecPtr RecPtr); ! extern int XLogFileInit(uint32 log, uint32 seg, bool *use_existent, bool use_lock); extern int XLogFileOpen(uint32 log, uint32 seg); --- 276,282 ---- extern void XLogFlush(XLogRecPtr RecPtr); extern void XLogBackgroundFlush(void); extern bool XLogNeedsFlush(XLogRecPtr RecPtr); ! extern int XLogFileInit(TimeLineID tli, uint32 log, uint32 seg, bool *use_existent, bool use_lock); extern int XLogFileOpen(uint32 log, uint32 seg); *************** *** 292,298 **** extern bool RecoveryInProgress(void); extern bool HotStandbyActive(void); extern bool XLogInsertAllowed(void); extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream); ! extern XLogRecPtr GetXLogReplayRecPtr(void); extern void UpdateControlFile(void); extern uint64 GetSystemIdentifier(void); --- 295,302 ---- extern bool HotStandbyActive(void); extern bool XLogInsertAllowed(void); extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream); ! extern XLogRecPtr GetXLogReplayRecPtr(XLogRecPtr *restoreLastRecPtr); ! extern XLogRecPtr GetStandbyFlushRecPtr(void); extern void UpdateControlFile(void); extern uint64 GetSystemIdentifier(void); *** a/src/include/replication/walsender.h --- b/src/include/replication/walsender.h *************** *** 92,97 **** extern WalSndCtlData *WalSndCtl; --- 92,98 ---- /* global state */ extern bool am_walsender; + extern bool cascading_walsender; extern volatile sig_atomic_t walsender_shutdown_requested; extern volatile sig_atomic_t walsender_ready_to_stop;
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers