On Mon, Jul 11, 2011 at 10:26 AM, Fujii Masao <masao.fu...@gmail.com> wrote:
> On Mon, Jul 11, 2011 at 3:30 AM, Josh Berkus <j...@agliodbs.com> wrote:
>> Do you think you'll submit a new version of the patch this commitfest?
>
> Yes. I'm now updating the patch according to Simon's comments.
> I will submit it today.

Attached is the updated version which addresses all the issues raised by Simon.

> The risk you describe already exists in current code.
>
> I regard it as a non-risk. The unlink() and the rename() are executed
> consecutively, so the gap between them is small, so the chance of a
> SIGKILL in that gap at the same time as losing the archive seems low,
> and we can always get that file from the master again if we are
> streaming. Any code you add to "fix" this will get executed so rarely
> it probably won't work when we need it to.
>
> In the current scheme we restart archiving from the last restartpoint,
> which exists only on the archive. This new patch improves upon this by
> keeping the most recent files locally, so we are less expose in the
> case of archive unavailability. So this patch already improves things
> and we don't need any more than that. No extra code please, IMHO.

Yes, I added no extra code for the risk I raised upthread.

> In #2, there is another problem; walsender might have the pre-existing file
> open, so the startup process would need to request walsenders to close the
> file before removing (or renaming) it, wait for new file to appear and open it
> again.

I implemented this.

Regards,

-- 
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
*** a/doc/src/sgml/config.sgml
--- b/doc/src/sgml/config.sgml
***************
*** 1949,1954 **** SET ENABLE_SEQSCAN TO OFF;
--- 1949,1956 ----
        The values of these parameters on standby servers are irrelevant,
        although you may wish to set them there in preparation for the
        possibility of a standby becoming the master.
+       Some of them need to be set in the standby for cascade replication
+       (see <xref linkend="cascade-replication">).
       </para>
  
       <variablelist>
***************
*** 2019,2025 **** SET ENABLE_SEQSCAN TO OFF;
          doesn't keep any extra segments for standby purposes, so the number
          of old WAL segments available to standby servers is a function of
          the location of the previous checkpoint and status of WAL
!         archiving.  This parameter has no effect on restartpoints.
          This parameter can only be set in the
          <filename>postgresql.conf</> file or on the server command line.
         </para>
--- 2021,2027 ----
          doesn't keep any extra segments for standby purposes, so the number
          of old WAL segments available to standby servers is a function of
          the location of the previous checkpoint and status of WAL
!         archiving.
          This parameter can only be set in the
          <filename>postgresql.conf</> file or on the server command line.
         </para>
***************
*** 2121,2127 **** SET ENABLE_SEQSCAN TO OFF;
          synchronous replication is enabled, individual transactions can be
          configured not to wait for replication by setting the
          <xref linkend="guc-synchronous-commit"> parameter to
!         <literal>local</> or <literal>off</>.
         </para>
         <para>
          This parameter can only be set in the <filename>postgresql.conf</>
--- 2123,2130 ----
          synchronous replication is enabled, individual transactions can be
          configured not to wait for replication by setting the
          <xref linkend="guc-synchronous-commit"> parameter to
!         <literal>local</> or <literal>off</>. This parameter has no effect on
!         cascade replication.
         </para>
         <para>
          This parameter can only be set in the <filename>postgresql.conf</>
*** a/doc/src/sgml/high-availability.sgml
--- b/doc/src/sgml/high-availability.sgml
***************
*** 877,884 **** primary_conninfo = 'host=192.168.1.50 port=5432 user=foo password=foopass'
--- 877,921 ----
       network delay, or that the standby is under heavy load.
      </para>
     </sect3>
+   </sect2>
+ 
+   <sect2 id="cascade-replication">
+    <title>Cascade Replication</title>
  
+    <indexterm zone="high-availability">
+     <primary>Cascade Replication</primary>
+    </indexterm>
+    <para>
+     Cascade replication feature allows the standby to accept the replication
+     connections and stream WAL records to another standbys. This is useful
+     for reducing the number of standbys connecting to the master and reducing
+     the overhead of the master, when you have many standbys.
+    </para>
+    <para>
+     The cascading standby sends not only WAL records received from the
+     master but also those restored from the archive. So even if the replication
+     connection in higher level is terminated, you can continue cascade replication.
+    </para>
+    <para>
+     Cascade replication is asynchronous. Note that synchronous replication
+     (see <xref linkend="synchronous-replication">) has no effect on cascade
+     replication.
+    </para>
+    <para>
+     Promoting the cascading standby terminates all the cascade replication
+     connections which it uses. This is because the timeline becomes different
+     between standbys, and they cannot continue replication any more.
+    </para>
+    <para>
+     To use cascade replication, set up the cascading standby so that it can
+     accept the replication connections, i.e., set <varname>max_wal_senders</>,
+     <varname>hot_standby</> and authentication option (see
+     <xref linkend="streaming-replication"> and <xref linkend="hot-standby">).
+     Also set <varname>primary_conninfo</> in the cascaded standby to point
+     to the cascading standby.
+    </para>
    </sect2>
+ 
    <sect2 id="synchronous-replication">
     <title>Synchronous Replication</title>
  
*** a/src/backend/access/transam/xlog.c
--- b/src/backend/access/transam/xlog.c
***************
*** 446,451 **** typedef struct XLogCtlData
--- 446,453 ----
  	XLogRecPtr	recoveryLastRecPtr;
  	/* timestamp of last COMMIT/ABORT record replayed (or being replayed) */
  	TimestampTz recoveryLastXTime;
+ 	/* end of the last record restored from the archive */
+ 	XLogRecPtr	restoreLastRecPtr;
  	/* Are we requested to pause recovery? */
  	bool		recoveryPause;
  
***************
*** 612,617 **** static void CheckRequiredParameterValues(void);
--- 614,620 ----
  static void XLogReportParameters(void);
  static void LocalSetXLogInsertAllowed(void);
  static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
+ static void WalKeepSegments(uint32 *logId, uint32 *logSeg, XLogRecPtr recptr);
  
  static bool XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
  				XLogRecPtr *lsn, BkpBlock *bkpb);
***************
*** 2729,2734 **** XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
--- 2732,2792 ----
  			elog(ERROR, "invalid XLogFileRead source %d", source);
  	}
  
+ 	/*
+ 	 * If the segment was fetched from archival storage, replace
+ 	 * the existing xlog segment (if any) with the archival version.
+ 	 */
+ 	if (source == XLOG_FROM_ARCHIVE)
+ 	{
+ 		/* use volatile pointer to prevent code rearrangement */
+ 		volatile XLogCtlData *xlogctl = XLogCtl;
+ 		XLogRecPtr		endptr;
+ 		char			xlogfpath[MAXPGPATH];
+ 		bool			reload = false;
+ 		struct stat		statbuf;
+ 
+ 		XLogFilePath(xlogfpath, tli, log, seg);
+ 		if (stat(xlogfpath, &statbuf) == 0)
+ 		{
+ 			if (unlink(xlogfpath) != 0)
+ 				ereport(FATAL,
+ 						(errcode_for_file_access(),
+ 						 errmsg("could not remove file \"%s\": %m",
+ 								xlogfpath)));
+ 			reload = true;
+ 		}
+ 
+ 		if (rename(path, xlogfpath) < 0)
+ 			ereport(ERROR,
+ 				(errcode_for_file_access(),
+ 				 errmsg("could not rename file \"%s\" to \"%s\": %m",
+ 						path, xlogfpath)));
+ 
+ 		/*
+ 		 * If the existing segment was replaced, since walsenders might have
+ 		 * it open, request them to reload a currently-open segment.
+ 		 */
+ 		if (reload)
+ 			WalSndRqstFileReload();
+ 
+ 		/*
+ 		 * Calculate the end location of the restored WAL file and save it in
+ 		 * shmem. It's used as current standby flush position, and cascading
+ 		 * walsenders try to send WAL records up to this location.
+ 		 */
+ 		endptr.xlogid = log;
+ 		endptr.xrecoff = seg * XLogSegSize;
+ 		XLByteAdvance(endptr, XLogSegSize);
+ 
+ 		SpinLockAcquire(&xlogctl->info_lck);
+ 		xlogctl->restoreLastRecPtr = endptr;
+ 		SpinLockRelease(&xlogctl->info_lck);
+ 
+ 		/* Signal walsender that new WAL has arrived */
+ 		if (AllowCascadeReplication())
+ 			WalSndWakeup();
+ 	}
+ 
  	fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
  	if (fd >= 0)
  	{
***************
*** 3361,3378 **** RemoveOldXlogFiles(uint32 log, uint32 seg, XLogRecPtr endptr)
  			strspn(xlde->d_name, "0123456789ABCDEF") == 24 &&
  			strcmp(xlde->d_name + 8, lastoff + 8) <= 0)
  		{
! 			/*
! 			 * Normally we don't delete old XLOG files during recovery to
! 			 * avoid accidentally deleting a file that looks stale due to a
! 			 * bug or hardware issue, but in fact contains important data.
! 			 * During streaming recovery, however, we will eventually fill the
! 			 * disk if we never clean up, so we have to. That's not an issue
! 			 * with file-based archive recovery because in that case we
! 			 * restore one XLOG file at a time, on-demand, and with a
! 			 * different filename that can't be confused with regular XLOG
! 			 * files.
! 			 */
! 			if (WalRcvInProgress() || XLogArchiveCheckDone(xlde->d_name))
  			{
  				snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlde->d_name);
  
--- 3419,3425 ----
  			strspn(xlde->d_name, "0123456789ABCDEF") == 24 &&
  			strcmp(xlde->d_name + 8, lastoff + 8) <= 0)
  		{
! 			if (RecoveryInProgress() || XLogArchiveCheckDone(xlde->d_name))
  			{
  				snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlde->d_name);
  
***************
*** 5484,5545 **** exitArchiveRecovery(TimeLineID endTLI, uint32 endLogId, uint32 endLogSeg)
  	}
  
  	/*
! 	 * If the segment was fetched from archival storage, we want to replace
! 	 * the existing xlog segment (if any) with the archival version.  This is
! 	 * because whatever is in XLOGDIR is very possibly older than what we have
! 	 * from the archives, since it could have come from restoring a PGDATA
! 	 * backup.	In any case, the archival version certainly is more
! 	 * descriptive of what our current database state is, because that is what
! 	 * we replayed from.
  	 *
! 	 * Note that if we are establishing a new timeline, ThisTimeLineID is
! 	 * already set to the new value, and so we will create a new file instead
! 	 * of overwriting any existing file.  (This is, in fact, always the case
! 	 * at present.)
  	 */
! 	snprintf(recoveryPath, MAXPGPATH, XLOGDIR "/RECOVERYXLOG");
! 	XLogFilePath(xlogpath, ThisTimeLineID, endLogId, endLogSeg);
! 
! 	if (restoredFromArchive)
  	{
! 		ereport(DEBUG3,
! 				(errmsg_internal("moving last restored xlog to \"%s\"",
! 								 xlogpath)));
! 		unlink(xlogpath);		/* might or might not exist */
! 		if (rename(recoveryPath, xlogpath) != 0)
! 			ereport(FATAL,
! 					(errcode_for_file_access(),
! 					 errmsg("could not rename file \"%s\" to \"%s\": %m",
! 							recoveryPath, xlogpath)));
! 		/* XXX might we need to fix permissions on the file? */
! 	}
! 	else
! 	{
! 		/*
! 		 * If the latest segment is not archival, but there's still a
! 		 * RECOVERYXLOG laying about, get rid of it.
! 		 */
! 		unlink(recoveryPath);	/* ignore any error */
  
! 		/*
! 		 * If we are establishing a new timeline, we have to copy data from
! 		 * the last WAL segment of the old timeline to create a starting WAL
! 		 * segment for the new timeline.
! 		 *
! 		 * Notify the archiver that the last WAL segment of the old timeline
! 		 * is ready to copy to archival storage. Otherwise, it is not archived
! 		 * for a while.
! 		 */
! 		if (endTLI != ThisTimeLineID)
  		{
! 			XLogFileCopy(endLogId, endLogSeg,
! 						 endTLI, endLogId, endLogSeg);
! 
! 			if (XLogArchivingActive())
! 			{
! 				XLogFileName(xlogpath, endTLI, endLogId, endLogSeg);
! 				XLogArchiveNotify(xlogpath);
! 			}
  		}
  	}
  
--- 5531,5553 ----
  	}
  
  	/*
! 	 * If we are establishing a new timeline, we have to copy data from
! 	 * the last WAL segment of the old timeline to create a starting WAL
! 	 * segment for the new timeline.
  	 *
! 	 * Notify the archiver that the last WAL segment of the old timeline
! 	 * is ready to copy to archival storage. Otherwise, it is not archived
! 	 * for a while.
  	 */
! 	if (endTLI != ThisTimeLineID)
  	{
! 		XLogFileCopy(endLogId, endLogSeg,
! 					 endTLI, endLogId, endLogSeg);
  
! 		if (XLogArchivingActive())
  		{
! 			XLogFileName(xlogpath, endTLI, endLogId, endLogSeg);
! 			XLogArchiveNotify(xlogpath);
  		}
  	}
  
***************
*** 5550,5555 **** exitArchiveRecovery(TimeLineID endTLI, uint32 endLogId, uint32 endLogSeg)
--- 5558,5570 ----
  	XLogFileName(xlogpath, ThisTimeLineID, endLogId, endLogSeg);
  	XLogArchiveCleanup(xlogpath);
  
+ 	/*
+ 	 * Since there might be a partial WAL segment named RECOVERYXLOG,
+ 	 * get rid of it.
+ 	 */
+ 	snprintf(recoveryPath, MAXPGPATH, XLOGDIR "/RECOVERYXLOG");
+ 	unlink(recoveryPath);		/* ignore any error */
+ 
  	/* Get rid of any remaining recovered timeline-history file, too */
  	snprintf(recoveryPath, MAXPGPATH, XLOGDIR "/RECOVERYHISTORY");
  	unlink(recoveryPath);		/* ignore any error */
***************
*** 7871,7916 **** CreateCheckPoint(int flags)
  	 */
  	if (_logId || _logSeg)
  	{
! 		/*
! 		 * Calculate the last segment that we need to retain because of
! 		 * wal_keep_segments, by subtracting wal_keep_segments from the new
! 		 * checkpoint location.
! 		 */
! 		if (wal_keep_segments > 0)
! 		{
! 			uint32		log;
! 			uint32		seg;
! 			int			d_log;
! 			int			d_seg;
! 
! 			XLByteToSeg(recptr, log, seg);
! 
! 			d_seg = wal_keep_segments % XLogSegsPerFile;
! 			d_log = wal_keep_segments / XLogSegsPerFile;
! 			if (seg < d_seg)
! 			{
! 				d_log += 1;
! 				seg = seg - d_seg + XLogSegsPerFile;
! 			}
! 			else
! 				seg = seg - d_seg;
! 			/* avoid underflow, don't go below (0,1) */
! 			if (log < d_log || (log == d_log && seg == 0))
! 			{
! 				log = 0;
! 				seg = 1;
! 			}
! 			else
! 				log = log - d_log;
! 
! 			/* don't delete WAL segments newer than the calculated segment */
! 			if (log < _logId || (log == _logId && seg < _logSeg))
! 			{
! 				_logId = log;
! 				_logSeg = seg;
! 			}
! 		}
! 
  		PrevLogSeg(_logId, _logSeg);
  		RemoveOldXlogFiles(_logId, _logSeg, recptr);
  	}
--- 7886,7892 ----
  	 */
  	if (_logId || _logSeg)
  	{
! 		WalKeepSegments(&_logId, &_logSeg, recptr);
  		PrevLogSeg(_logId, _logSeg);
  		RemoveOldXlogFiles(_logId, _logSeg, recptr);
  	}
***************
*** 8151,8167 **** CreateRestartPoint(int flags)
  	/*
  	 * Delete old log files (those no longer needed even for previous
  	 * checkpoint/restartpoint) to prevent the disk holding the xlog from
! 	 * growing full. We don't need do this during normal recovery, but during
! 	 * streaming recovery we have to or the disk will eventually fill up from
! 	 * old log files streamed from master.
  	 */
! 	if (WalRcvInProgress() && (_logId || _logSeg))
  	{
  		XLogRecPtr	endptr;
  
  		/* Get the current (or recent) end of xlog */
! 		endptr = GetWalRcvWriteRecPtr(NULL);
  
  		PrevLogSeg(_logId, _logSeg);
  		RemoveOldXlogFiles(_logId, _logSeg, endptr);
  
--- 8127,8142 ----
  	/*
  	 * Delete old log files (those no longer needed even for previous
  	 * checkpoint/restartpoint) to prevent the disk holding the xlog from
! 	 * growing full.
  	 */
! 	if (_logId || _logSeg)
  	{
  		XLogRecPtr	endptr;
  
  		/* Get the current (or recent) end of xlog */
! 		endptr = GetStandbyFlushRecPtr();
  
+ 		WalKeepSegments(&_logId, &_logSeg, endptr);
  		PrevLogSeg(_logId, _logSeg);
  		RemoveOldXlogFiles(_logId, _logSeg, endptr);
  
***************
*** 8207,8212 **** CreateRestartPoint(int flags)
--- 8182,8231 ----
  }
  
  /*
+  * Calculate the last segment that we need to retain because of
+  * wal_keep_segments, by subtracting wal_keep_segments from
+  * the given location.
+  */
+ static void
+ WalKeepSegments(uint32 *logId, uint32 *logSeg, XLogRecPtr recptr)
+ {
+ 	uint32		log;
+ 	uint32		seg;
+ 	int			d_log;
+ 	int			d_seg;
+ 
+ 	if (wal_keep_segments == 0)
+ 		return;
+ 
+ 	XLByteToSeg(recptr, log, seg);
+ 
+ 	d_seg = wal_keep_segments % XLogSegsPerFile;
+ 	d_log = wal_keep_segments / XLogSegsPerFile;
+ 	if (seg < d_seg)
+ 	{
+ 		d_log += 1;
+ 		seg = seg - d_seg + XLogSegsPerFile;
+ 	}
+ 	else
+ 		seg = seg - d_seg;
+ 	/* avoid underflow, don't go below (0,1) */
+ 	if (log < d_log || (log == d_log && seg == 0))
+ 	{
+ 		log = 0;
+ 		seg = 1;
+ 	}
+ 	else
+ 		log = log - d_log;
+ 
+ 	/* don't delete WAL segments newer than the calculated segment */
+ 	if (log < *logId || (log == *logId && seg < *logSeg))
+ 	{
+ 		*logId = log;
+ 		*logSeg = seg;
+ 	}
+ }
+ 
+ /*
   * Write a NEXTOID log record
   */
  void
***************
*** 9549,9558 **** pg_last_xlog_receive_location(PG_FUNCTION_ARGS)
  /*
   * Get latest redo apply position.
   *
   * Exported to allow WALReceiver to read the pointer directly.
   */
  XLogRecPtr
! GetXLogReplayRecPtr(void)
  {
  	/* use volatile pointer to prevent code rearrangement */
  	volatile XLogCtlData *xlogctl = XLogCtl;
--- 9568,9581 ----
  /*
   * Get latest redo apply position.
   *
+  * Optionally, returns the end byte position of the last restored
+  * WAL segment. Callers not interested in that value may pass
+  * NULL for restoreLastRecPtr.
+  *
   * Exported to allow WALReceiver to read the pointer directly.
   */
  XLogRecPtr
! GetXLogReplayRecPtr(XLogRecPtr *restoreLastRecPtr)
  {
  	/* use volatile pointer to prevent code rearrangement */
  	volatile XLogCtlData *xlogctl = XLogCtl;
***************
*** 9560,9571 **** GetXLogReplayRecPtr(void)
--- 9583,9616 ----
  
  	SpinLockAcquire(&xlogctl->info_lck);
  	recptr = xlogctl->recoveryLastRecPtr;
+ 	if (restoreLastRecPtr)
+ 		*restoreLastRecPtr = xlogctl->restoreLastRecPtr;
  	SpinLockRelease(&xlogctl->info_lck);
  
  	return recptr;
  }
  
  /*
+  * Get current standby flush position, ie, the last WAL position
+  * known to be fsync'd to disk in standby.
+  */
+ XLogRecPtr
+ GetStandbyFlushRecPtr(void)
+ {
+ 	XLogRecPtr	receivePtr;
+ 	XLogRecPtr	replayPtr;
+ 	XLogRecPtr	restorePtr;
+ 
+ 	receivePtr = GetWalRcvWriteRecPtr(NULL);
+ 	replayPtr = GetXLogReplayRecPtr(&restorePtr);
+ 
+ 	if (XLByteLT(receivePtr, replayPtr))
+ 		return XLByteLT(replayPtr, restorePtr) ? restorePtr : replayPtr;
+ 	else
+ 		return XLByteLT(receivePtr, restorePtr) ? restorePtr : receivePtr;
+ }
+ 
+ /*
   * Report the last WAL replay location (same format as pg_start_backup etc)
   *
   * This is useful for determining how much of WAL is visible to read-only
***************
*** 9577,9583 **** pg_last_xlog_replay_location(PG_FUNCTION_ARGS)
  	XLogRecPtr	recptr;
  	char		location[MAXFNAMELEN];
  
! 	recptr = GetXLogReplayRecPtr();
  
  	if (recptr.xlogid == 0 && recptr.xrecoff == 0)
  		PG_RETURN_NULL();
--- 9622,9628 ----
  	XLogRecPtr	recptr;
  	char		location[MAXFNAMELEN];
  
! 	recptr = GetXLogReplayRecPtr(NULL);
  
  	if (recptr.xlogid == 0 && recptr.xrecoff == 0)
  		PG_RETURN_NULL();
*** a/src/backend/postmaster/postmaster.c
--- b/src/backend/postmaster/postmaster.c
***************
*** 2317,2322 **** reaper(SIGNAL_ARGS)
--- 2317,2331 ----
  			pmState = PM_RUN;
  
  			/*
+ 			 * Kill the cascading walsender to urge the cascaded standby to
+ 			 * reread the timeline history file, adjust its timeline and
+ 			 * establish replication connection again. This is required
+ 			 * because the timeline of cascading standby is not consistent
+ 			 * with that of cascaded one just after failover.
+ 			 */
+ 			SignalSomeChildren(SIGUSR2, BACKEND_TYPE_WALSND);
+ 
+ 			/*
  			 * Crank up the background writer, if we didn't do that already
  			 * when we entered consistent recovery state.  It doesn't matter
  			 * if this fails, we'll just try again later.
*** a/src/backend/replication/basebackup.c
--- b/src/backend/replication/basebackup.c
***************
*** 339,344 **** SendBaseBackup(BaseBackupCmd *cmd)
--- 339,349 ----
  	MemoryContext old_context;
  	basebackup_options opt;
  
+ 	if (cascading_walsender)
+ 		ereport(FATAL,
+ 				(errcode(ERRCODE_CANNOT_CONNECT_NOW),
+ 				 errmsg("recovery is still in progress, can't accept WAL streaming connections for backup")));
+ 
  	parse_basebackup_options(cmd->options, &opt);
  
  	backup_context = AllocSetContextCreate(CurrentMemoryContext,
*** a/src/backend/replication/syncrep.c
--- b/src/backend/replication/syncrep.c
***************
*** 469,474 **** SyncRepGetStandbyPriority(void)
--- 469,481 ----
  	int			priority = 0;
  	bool		found = false;
  
+ 	/*
+ 	 * Since synchronous cascade replication is not allowed, we always
+ 	 * set the priority of cascading walsender to zero.
+ 	 */
+ 	if (cascading_walsender)
+ 		return 0;
+ 
  	/* Need a modifiable copy of string */
  	rawstring = pstrdup(SyncRepStandbyNames);
  
*** a/src/backend/replication/walreceiver.c
--- b/src/backend/replication/walreceiver.c
***************
*** 44,49 ****
--- 44,50 ----
  #include "miscadmin.h"
  #include "replication/walprotocol.h"
  #include "replication/walreceiver.h"
+ #include "replication/walsender.h"
  #include "storage/ipc.h"
  #include "storage/pmsignal.h"
  #include "storage/procarray.h"
***************
*** 564,571 **** XLogWalRcvFlush(bool dying)
  		}
  		SpinLockRelease(&walrcv->mutex);
  
! 		/* Signal the startup process that new WAL has arrived */
  		WakeupRecovery();
  
  		/* Report XLOG streaming progress in PS display */
  		if (update_process_title)
--- 565,574 ----
  		}
  		SpinLockRelease(&walrcv->mutex);
  
! 		/* Signal the startup process and walsender that new WAL has arrived */
  		WakeupRecovery();
+ 		if (AllowCascadeReplication())
+ 			WalSndWakeup();
  
  		/* Report XLOG streaming progress in PS display */
  		if (update_process_title)
***************
*** 625,631 **** XLogWalRcvSendReply(void)
  	/* Construct a new message */
  	reply_message.write = LogstreamResult.Write;
  	reply_message.flush = LogstreamResult.Flush;
! 	reply_message.apply = GetXLogReplayRecPtr();
  	reply_message.sendTime = now;
  
  	elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X",
--- 628,634 ----
  	/* Construct a new message */
  	reply_message.write = LogstreamResult.Write;
  	reply_message.flush = LogstreamResult.Flush;
! 	reply_message.apply = GetXLogReplayRecPtr(NULL);
  	reply_message.sendTime = now;
  
  	elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X",
*** a/src/backend/replication/walsender.c
--- b/src/backend/replication/walsender.c
***************
*** 48,53 ****
--- 48,54 ----
  #include "replication/basebackup.h"
  #include "replication/replnodes.h"
  #include "replication/walprotocol.h"
+ #include "replication/walreceiver.h"
  #include "replication/walsender.h"
  #include "storage/fd.h"
  #include "storage/ipc.h"
***************
*** 70,75 **** WalSnd	   *MyWalSnd = NULL;
--- 71,77 ----
  
  /* Global state */
  bool		am_walsender = false;		/* Am I a walsender process ? */
+ bool		cascading_walsender = false;	/* Am I cascading WAL to another standby ? */
  
  /* User-settable parameters for walsender */
  int			max_wal_senders = 0;	/* the maximum number of concurrent walsenders */
***************
*** 135,144 **** WalSenderMain(void)
  {
  	MemoryContext walsnd_context;
  
! 	if (RecoveryInProgress())
! 		ereport(FATAL,
! 				(errcode(ERRCODE_CANNOT_CONNECT_NOW),
! 				 errmsg("recovery is still in progress, can't accept WAL streaming connections")));
  
  	/* Create a per-walsender data structure in shared memory */
  	InitWalSnd();
--- 137,143 ----
  {
  	MemoryContext walsnd_context;
  
! 	cascading_walsender = RecoveryInProgress();
  
  	/* Create a per-walsender data structure in shared memory */
  	InitWalSnd();
***************
*** 165,170 **** WalSenderMain(void)
--- 164,175 ----
  	/* Unblock signals (they were blocked when the postmaster forked us) */
  	PG_SETMASK(&UnBlockSig);
  
+ 	/*
+ 	 * Use the recovery target timeline ID during recovery
+ 	 */
+ 	if (cascading_walsender)
+ 		ThisTimeLineID = GetRecoveryTargetTLI();
+ 
  	/* Tell the standby that walsender is ready for receiving commands */
  	ReadyForQuery(DestRemote);
  
***************
*** 290,296 **** IdentifySystem(void)
  			 GetSystemIdentifier());
  	snprintf(tli, sizeof(tli), "%u", ThisTimeLineID);
  
! 	logptr = GetInsertRecPtr();
  
  	snprintf(xpos, sizeof(xpos), "%X/%X",
  			 logptr.xlogid, logptr.xrecoff);
--- 295,301 ----
  			 GetSystemIdentifier());
  	snprintf(tli, sizeof(tli), "%u", ThisTimeLineID);
  
! 	logptr = cascading_walsender ? GetStandbyFlushRecPtr() : GetInsertRecPtr();
  
  	snprintf(xpos, sizeof(xpos), "%X/%X",
  			 logptr.xlogid, logptr.xrecoff);
***************
*** 372,379 **** StartReplication(StartReplicationCmd *cmd)
  	 * directory that was created with 'minimal'. So this is not bulletproof,
  	 * the purpose is just to give a user-friendly error message that hints
  	 * how to configure the system correctly.
  	 */
! 	if (wal_level == WAL_LEVEL_MINIMAL)
  		ereport(FATAL,
  				(errcode(ERRCODE_CANNOT_CONNECT_NOW),
  		errmsg("standby connections not allowed because wal_level=minimal")));
--- 377,388 ----
  	 * directory that was created with 'minimal'. So this is not bulletproof,
  	 * the purpose is just to give a user-friendly error message that hints
  	 * how to configure the system correctly.
+ 	 *
+ 	 * NOTE: The existence of cascading walsender means that wal_level is set
+ 	 * to hot_standby in the master. So we don't need to check the value of
+ 	 * wal_level during recovery.
  	 */
! 	if (!cascading_walsender && wal_level == WAL_LEVEL_MINIMAL)
  		ereport(FATAL,
  				(errcode(ERRCODE_CANNOT_CONNECT_NOW),
  		errmsg("standby connections not allowed because wal_level=minimal")));
***************
*** 601,607 **** ProcessStandbyReplyMessage(void)
  		SpinLockRelease(&walsnd->mutex);
  	}
  
! 	SyncRepReleaseWaiters();
  }
  
  /*
--- 610,617 ----
  		SpinLockRelease(&walsnd->mutex);
  	}
  
! 	if (!cascading_walsender)
! 		SyncRepReleaseWaiters();
  }
  
  /*
***************
*** 770,776 **** WalSndLoop(void)
--- 780,793 ----
  			XLogSend(output_message, &caughtup);
  			ProcessRepliesIfAny();
  			if (caughtup && !pq_is_send_pending())
+ 			{
  				walsender_shutdown_requested = true;
+ 
+ 				if (cascading_walsender)
+ 					ereport(LOG,
+ 							(errmsg("terminating walsender process to request the cascaded "
+ 									"standby to update timeline and reconnect")));
+ 			}
  		}
  
  		if ((caughtup || pq_is_send_pending()) &&
***************
*** 933,939 **** WalSndKill(int code, Datum arg)
  }
  
  /*
!  * Read 'nbytes' bytes from WAL into 'buf', starting at location 'recptr'
   *
   * XXX probably this should be improved to suck data directly from the
   * WAL buffers when possible.
--- 950,956 ----
  }
  
  /*
!  * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
   *
   * XXX probably this should be improved to suck data directly from the
   * WAL buffers when possible.
***************
*** 944,958 **** WalSndKill(int code, Datum arg)
   * more than one.
   */
  void
! XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
  {
! 	XLogRecPtr	startRecPtr = recptr;
! 	char		path[MAXPGPATH];
  	uint32		lastRemovedLog;
  	uint32		lastRemovedSeg;
  	uint32		log;
  	uint32		seg;
  
  	while (nbytes > 0)
  	{
  		uint32		startoff;
--- 961,981 ----
   * more than one.
   */
  void
! XLogRead(char *buf, XLogRecPtr startptr, Size count)
  {
! 	char		   *p;
! 	XLogRecPtr	recptr;
! 	Size			nbytes;
  	uint32		lastRemovedLog;
  	uint32		lastRemovedSeg;
  	uint32		log;
  	uint32		seg;
  
+ retry:
+ 	p = buf;
+ 	recptr = startptr;
+ 	nbytes = count;
+ 
  	while (nbytes > 0)
  	{
  		uint32		startoff;
***************
*** 963,968 **** XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
--- 986,993 ----
  
  		if (sendFile < 0 || !XLByteInSeg(recptr, sendId, sendSeg))
  		{
+ 			char		path[MAXPGPATH];
+ 
  			/* Switch to another logfile segment */
  			if (sendFile >= 0)
  				close(sendFile);
***************
*** 1014,1020 **** XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
  		else
  			segbytes = nbytes;
  
! 		readbytes = read(sendFile, buf, segbytes);
  		if (readbytes <= 0)
  			ereport(ERROR,
  					(errcode_for_file_access(),
--- 1039,1045 ----
  		else
  			segbytes = nbytes;
  
! 		readbytes = read(sendFile, p, segbytes);
  		if (readbytes <= 0)
  			ereport(ERROR,
  					(errcode_for_file_access(),
***************
*** 1027,1033 **** XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
  
  		sendOff += readbytes;
  		nbytes -= readbytes;
! 		buf += readbytes;
  	}
  
  	/*
--- 1052,1058 ----
  
  		sendOff += readbytes;
  		nbytes -= readbytes;
! 		p += readbytes;
  	}
  
  	/*
***************
*** 1038,1044 **** XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
  	 * already have been overwritten with new WAL records.
  	 */
  	XLogGetLastRemoved(&lastRemovedLog, &lastRemovedSeg);
! 	XLByteToSeg(startRecPtr, log, seg);
  	if (log < lastRemovedLog ||
  		(log == lastRemovedLog && seg <= lastRemovedSeg))
  	{
--- 1063,1069 ----
  	 * already have been overwritten with new WAL records.
  	 */
  	XLogGetLastRemoved(&lastRemovedLog, &lastRemovedSeg);
! 	XLByteToSeg(startptr, log, seg);
  	if (log < lastRemovedLog ||
  		(log == lastRemovedLog && seg <= lastRemovedSeg))
  	{
***************
*** 1050,1055 **** XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
--- 1075,1106 ----
  				 errmsg("requested WAL segment %s has already been removed",
  						filename)));
  	}
+ 
+ 	/*
+ 	 * During recovery, the currently-open WAL file might be replaced with
+ 	 * the file of the same name retrieved from archive. So we always need
+ 	 * to check what we read was valid after reading into the buffer. If it's
+ 	 * invalid, we try to open and read the file again.
+ 	 */
+ 	if (cascading_walsender)
+ 	{
+ 		/* use volatile pointer to prevent code rearrangement */
+ 		volatile WalSnd *walsnd = MyWalSnd;
+ 		bool		reload;
+ 
+ 		SpinLockAcquire(&walsnd->mutex);
+ 		reload = walsnd->reload;
+ 		walsnd->reload = false;
+ 		SpinLockRelease(&walsnd->mutex);
+ 
+ 		if (reload && sendFile >= 0)
+ 		{
+ 			close(sendFile);
+ 			sendFile = -1;
+ 
+ 			goto retry;
+ 		}
+ 	}
  }
  
  /*
***************
*** 1082,1088 **** XLogSend(char *msgbuf, bool *caughtup)
  	 * subsequently crashes and restarts, slaves must not have applied any WAL
  	 * that gets lost on the master.
  	 */
! 	SendRqstPtr = GetFlushRecPtr();
  
  	/* Quick exit if nothing to do */
  	if (XLByteLE(SendRqstPtr, sentPtr))
--- 1133,1139 ----
  	 * subsequently crashes and restarts, slaves must not have applied any WAL
  	 * that gets lost on the master.
  	 */
! 	SendRqstPtr = cascading_walsender ? GetStandbyFlushRecPtr() : GetFlushRecPtr();
  
  	/* Quick exit if nothing to do */
  	if (XLByteLE(SendRqstPtr, sentPtr))
***************
*** 1187,1192 **** XLogSend(char *msgbuf, bool *caughtup)
--- 1238,1265 ----
  	return;
  }
  
+ /*
+  * Request walsenders to reload the currently-open WAL file
+  */
+ void
+ WalSndRqstFileReload(void)
+ {
+ 	int			i;
+ 
+ 	for (i = 0; i < max_wal_senders; i++)
+ 	{
+ 		/* use volatile pointer to prevent code rearrangement */
+ 		volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
+ 
+ 		if (walsnd->pid == 0)
+ 			continue;
+ 
+ 		SpinLockAcquire(&walsnd->mutex);
+ 		walsnd->reload = true;
+ 		SpinLockRelease(&walsnd->mutex);
+ 	}
+ }
+ 
  /* SIGHUP: set flag to re-read config file at next convenient time */
  static void
  WalSndSigHupHandler(SIGNAL_ARGS)
*** a/src/include/access/xlog.h
--- b/src/include/access/xlog.h
***************
*** 221,226 **** extern int	wal_level;
--- 221,229 ----
  /* Do we need to WAL-log information required only for Hot Standby? */
  #define XLogStandbyInfoActive() (wal_level >= WAL_LEVEL_HOT_STANDBY)
  
+ /* Can we allow the standby to accept replication connection from another standby? */
+ #define AllowCascadeReplication() (EnableHotStandby && max_wal_senders > 0)
+ 
  #ifdef WAL_DEBUG
  extern bool XLOG_DEBUG;
  #endif
***************
*** 292,298 **** extern bool RecoveryInProgress(void);
  extern bool HotStandbyActive(void);
  extern bool XLogInsertAllowed(void);
  extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream);
! extern XLogRecPtr GetXLogReplayRecPtr(void);
  
  extern void UpdateControlFile(void);
  extern uint64 GetSystemIdentifier(void);
--- 295,302 ----
  extern bool HotStandbyActive(void);
  extern bool XLogInsertAllowed(void);
  extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream);
! extern XLogRecPtr GetXLogReplayRecPtr(XLogRecPtr *restoreLastRecPtr);
! extern XLogRecPtr GetStandbyFlushRecPtr(void);
  
  extern void UpdateControlFile(void);
  extern uint64 GetSystemIdentifier(void);
*** a/src/include/replication/walsender.h
--- b/src/include/replication/walsender.h
***************
*** 35,40 **** typedef struct WalSnd
--- 35,41 ----
  	pid_t		pid;			/* this walsender's process id, or 0 */
  	WalSndState state;			/* this walsender's state */
  	XLogRecPtr	sentPtr;		/* WAL has been sent up to this point */
+ 	bool			reload;		/* does currently-open file need to be reloaded? */
  
  	/*
  	 * The xlog locations that have been written, flushed, and applied by
***************
*** 92,97 **** extern WalSndCtlData *WalSndCtl;
--- 93,99 ----
  
  /* global state */
  extern bool am_walsender;
+ extern bool cascading_walsender;
  extern volatile sig_atomic_t walsender_shutdown_requested;
  extern volatile sig_atomic_t walsender_ready_to_stop;
  
***************
*** 106,112 **** extern Size WalSndShmemSize(void);
  extern void WalSndShmemInit(void);
  extern void WalSndWakeup(void);
  extern void WalSndSetState(WalSndState state);
! extern void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes);
  
  extern Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS);
  
--- 108,115 ----
  extern void WalSndShmemInit(void);
  extern void WalSndWakeup(void);
  extern void WalSndSetState(WalSndState state);
! extern void XLogRead(char *buf, XLogRecPtr startptr, Size count);
! extern void WalSndRqstFileReload(void);
  
  extern Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS);
  
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to