*** a/doc/src/sgml/config.sgml
--- b/doc/src/sgml/config.sgml
***************
*** 2017,2022 **** SET ENABLE_SEQSCAN TO OFF;
--- 2017,2044 ----
         </para>
        </listitem>
       </varlistentry>
+ 
+      <varlistentry id="guc-replication-timeout" xreflabel="replication_timeout">
+       <term><varname>replication_timeout</varname> (<type>integer</type>)</term>
+       <indexterm>
+        <primary><varname>replication_timeout</> configuration parameter</primary>
+       </indexterm>
+       <listitem>
+        <para>
+         Specifies the maximum time, in milliseconds, to wait for the reply
+         from the standby before terminating replication.  This is useful for
+         the primary server to detect the standby crash or network outage.
+         A value of zero turns this off.  This parameter can only be set in
+         the <filename>postgresql.conf</> file or on the server command line.
+         The default value is 60 seconds.
+        </para>
+        <para>
+         To make the timeout work properly, <xref linkend="guc-wal-receiver-status-interval">
+         must be enabled on the standby, and its value must be less than the
+         value of <varname>replication_timeout</>.
+        </para>
+       </listitem>
+      </varlistentry>
       </variablelist>
      </sect2>
  
***************
*** 2215,2220 **** SET ENABLE_SEQSCAN TO OFF;
--- 2237,2247 ----
         the <filename>postgresql.conf</> file or on the server command line.
         The default value is 10 seconds.
        </para>
+       <para>
+        When <xref linkend="guc-replication-timeout"> is enabled on the primary,
+        <varname>wal_receiver_status_interval</> must be enabled, and its value
+        must be less than the value of <varname>replication_timeout</>.
+       </para>
        </listitem>
       </varlistentry>
  
*** a/src/backend/libpq/pqcomm.c
--- b/src/backend/libpq/pqcomm.c
***************
*** 56,61 ****
--- 56,64 ----
   *		pq_putbytes		- send bytes to connection (not flushed until pq_flush)
   *		pq_flush		- flush pending output
   *		pq_getbyte_if_available - get a byte if available without blocking
+  *		pq_putbytes_if_writable	- send bytes to connection if writable without blocking
+  *		pq_flush_if_writable	- flush pending output if writable without blocking
+  *		pq_set_nonblocking	- set socket blocking/non-blocking
   *
   * message-level I/O (and old-style-COPY-OUT cruft):
   *		pq_putmessage	- send a normal message (suppressed in COPY OUT mode)
***************
*** 112,117 **** static char sock_path[MAXPGPATH];
--- 115,121 ----
  
  static char PqSendBuffer[PQ_BUFFER_SIZE];
  static int	PqSendPointer;		/* Next index to store a byte in PqSendBuffer */
+ static int	PqSendStart;		/* Next index to send a byte in PqSendBuffer */
  
  static char PqRecvBuffer[PQ_BUFFER_SIZE];
  static int	PqRecvPointer;		/* Next index to read a byte from PqRecvBuffer */
***************
*** 127,133 **** static bool DoingCopyOut;
  /* Internal functions */
  static void pq_close(int code, Datum arg);
  static int	internal_putbytes(const char *s, size_t len);
! static int	internal_flush(void);
  
  #ifdef HAVE_UNIX_SOCKETS
  static int	Lock_AF_UNIX(unsigned short portNumber, char *unixSocketName);
--- 131,138 ----
  /* Internal functions */
  static void pq_close(int code, Datum arg);
  static int	internal_putbytes(const char *s, size_t len);
! static int	internal_flush(bool nonblocking);
! static void pq_set_nonblocking(bool nonblocking, int emode);
  
  #ifdef HAVE_UNIX_SOCKETS
  static int	Lock_AF_UNIX(unsigned short portNumber, char *unixSocketName);
***************
*** 142,148 **** static int	Setup_AF_UNIX(void);
  void
  pq_init(void)
  {
! 	PqSendPointer = PqRecvPointer = PqRecvLength = 0;
  	PqCommBusy = false;
  	DoingCopyOut = false;
  	on_proc_exit(pq_close, 0);
--- 147,153 ----
  void
  pq_init(void)
  {
! 	PqSendPointer = PqSendStart = PqRecvPointer = PqRecvLength = 0;
  	PqCommBusy = false;
  	DoingCopyOut = false;
  	on_proc_exit(pq_close, 0);
***************
*** 846,859 **** pq_getbyte_if_available(unsigned char *c)
  	}
  
  	/* Temporarily put the socket into non-blocking mode */
! #ifdef WIN32
! 	pgwin32_noblock = 1;
! #else
! 	if (!pg_set_noblock(MyProcPort->sock))
! 		ereport(ERROR,
! 				(errmsg("could not set socket to non-blocking mode: %m")));
! #endif
! 	MyProcPort->noblock = true;
  	PG_TRY();
  	{
  		r = secure_read(MyProcPort, c, 1);
--- 851,857 ----
  	}
  
  	/* Temporarily put the socket into non-blocking mode */
! 	pq_set_nonblocking(true, ERROR);
  	PG_TRY();
  	{
  		r = secure_read(MyProcPort, c, 1);
***************
*** 892,916 **** pq_getbyte_if_available(unsigned char *c)
  		 * The rest of the backend code assumes the socket is in blocking
  		 * mode, so treat failure as FATAL.
  		 */
! #ifdef WIN32
! 		pgwin32_noblock = 0;
! #else
! 		if (!pg_set_block(MyProcPort->sock))
! 			ereport(FATAL,
! 					(errmsg("could not set socket to blocking mode: %m")));
! #endif
! 		MyProcPort->noblock = false;
  		PG_RE_THROW();
  	}
  	PG_END_TRY();
! #ifdef WIN32
! 	pgwin32_noblock = 0;
! #else
! 	if (!pg_set_block(MyProcPort->sock))
! 		ereport(FATAL,
! 				(errmsg("could not set socket to blocking mode: %m")));
! #endif
! 	MyProcPort->noblock = false;
  
  	return r;
  }
--- 890,900 ----
  		 * The rest of the backend code assumes the socket is in blocking
  		 * mode, so treat failure as FATAL.
  		 */
! 		pq_set_nonblocking(false, FATAL);
  		PG_RE_THROW();
  	}
  	PG_END_TRY();
! 	pq_set_nonblocking(false, FATAL);
  
  	return r;
  }
***************
*** 1139,1145 **** internal_putbytes(const char *s, size_t len)
  	{
  		/* If buffer is full, then flush it out */
  		if (PqSendPointer >= PQ_BUFFER_SIZE)
! 			if (internal_flush())
  				return EOF;
  		amount = PQ_BUFFER_SIZE - PqSendPointer;
  		if (amount > len)
--- 1123,1129 ----
  	{
  		/* If buffer is full, then flush it out */
  		if (PqSendPointer >= PQ_BUFFER_SIZE)
! 			if (internal_flush(false) == EOF)
  				return EOF;
  		amount = PQ_BUFFER_SIZE - PqSendPointer;
  		if (amount > len)
***************
*** 1153,1158 **** internal_putbytes(const char *s, size_t len)
--- 1137,1192 ----
  }
  
  /* --------------------------------
+  *		pq_putbytes_if_writable - send bytes to connection (not flushed
+  *			until pq_flush), if writable
+  *
+  * Returns the number of bytes written without blocking, or EOF if trouble.
+  * --------------------------------
+  */
+ int
+ pq_putbytes_if_writable(const char *s, size_t len)
+ {
+ 	size_t		amount;
+ 	size_t		nwritten = 0;
+ 
+ 	/* Should not be called by old-style COPY OUT */
+ 	Assert(!DoingCopyOut);
+ 	/* No-op if reentrant call */
+ 	if (PqCommBusy)
+ 		return 0;
+ 	PqCommBusy = true;
+ 
+ 	while (len > 0)
+ 	{
+ 		/* If buffer is full, then flush it out */
+ 		if (PqSendPointer >= PQ_BUFFER_SIZE)
+ 		{
+ 			int		r;
+ 
+ 			r = internal_flush(true);
+ 			if (r == 0)
+ 				break;
+ 			if (r == EOF)
+ 			{
+ 				PqCommBusy = false;
+ 				return r;
+ 			}
+ 		}
+ 		amount = PQ_BUFFER_SIZE - PqSendPointer;
+ 		if (amount > len)
+ 			amount = len;
+ 		memcpy(PqSendBuffer + PqSendPointer, s, amount);
+ 		PqSendPointer += amount;
+ 		s += amount;
+ 		len -= amount;
+ 		nwritten += amount;
+ 	}
+ 
+ 	PqCommBusy = false;
+ 	return (int) nwritten;
+ }
+ 
+ /* --------------------------------
   *		pq_flush		- flush pending output
   *
   *		returns 0 if OK, EOF if trouble
***************
*** 1167,1227 **** pq_flush(void)
  	if (PqCommBusy)
  		return 0;
  	PqCommBusy = true;
! 	res = internal_flush();
  	PqCommBusy = false;
! 	return res;
  }
  
  static int
! internal_flush(void)
  {
  	static int	last_reported_send_errno = 0;
  
! 	char	   *bufptr = PqSendBuffer;
  	char	   *bufend = PqSendBuffer + PqSendPointer;
  
! 	while (bufptr < bufend)
  	{
! 		int			r;
! 
! 		r = secure_write(MyProcPort, bufptr, bufend - bufptr);
! 
! 		if (r <= 0)
  		{
! 			if (errno == EINTR)
! 				continue;		/* Ok if we were interrupted */
  
! 			/*
! 			 * Careful: an ereport() that tries to write to the client would
! 			 * cause recursion to here, leading to stack overflow and core
! 			 * dump!  This message must go *only* to the postmaster log.
! 			 *
! 			 * If a client disconnects while we're in the midst of output, we
! 			 * might write quite a bit of data before we get to a safe query
! 			 * abort point.  So, suppress duplicate log messages.
! 			 */
! 			if (errno != last_reported_send_errno)
  			{
! 				last_reported_send_errno = errno;
! 				ereport(COMMERROR,
! 						(errcode_for_socket_access(),
! 						 errmsg("could not send data to client: %m")));
  			}
  
! 			/*
! 			 * We drop the buffered data anyway so that processing can
! 			 * continue, even though we'll probably quit soon.
! 			 */
! 			PqSendPointer = 0;
! 			return EOF;
  		}
- 
- 		last_reported_send_errno = 0;	/* reset after any successful send */
- 		bufptr += r;
  	}
  
! 	PqSendPointer = 0;
! 	return 0;
  }
  
  
--- 1201,1362 ----
  	if (PqCommBusy)
  		return 0;
  	PqCommBusy = true;
! 	res = internal_flush(false);
  	PqCommBusy = false;
! 	return (res == 1) ? 0 : EOF;
  }
  
+ /* --------------------------------
+  *		internal_flush - flush pending output
+  *
+  * Returns 1 if OK, 0 if pending output cannot be written without blocking
+  * (only possible nonblocking is true), or EOF if trouble.
+  * --------------------------------
+  */
  static int
! internal_flush(bool nonblocking)
  {
  	static int	last_reported_send_errno = 0;
+ 	int		r;
  
! 	char	   *bufptr = PqSendBuffer + PqSendStart;
  	char	   *bufend = PqSendBuffer + PqSendPointer;
  
! 	/* Temporarily put the socket into non-blocking mode */
! 	if (nonblocking)
! 		pq_set_nonblocking(true, ERROR);
! 	PG_TRY();
  	{
! 		while (bufptr < bufend)
  		{
! 			r = secure_write(MyProcPort, bufptr, bufend - bufptr);
  
! 			if (r <= 0)
  			{
! 				/* Ok if we were interrupted in blocking mode */
! 				if (!nonblocking && errno == EINTR)
! 					continue;
! 
! 				if (nonblocking)
! 				{
! 					if (r == 0)
! 						r = EOF;	/* EOF detected */
! 					else if (errno == EAGAIN ||
! 							 errno == EWOULDBLOCK ||
! 							 errno == EINTR)
! 					{
! 						/*
! 						 * Ok if no data writable without blocking or
! 						 * interrupted (though EINTR really shouldn't
! 						 * happen with a non-blocking socket). Report
! 						 * other errors.
! 						 */
! 						r = 0;
! 					}
! 					break;
! 				}
! 
! 				/*
! 				 * Careful: an ereport() that tries to write to the
! 				 * client would cause recursion to here, leading to
! 				 * stack overflow and core dump!  This message must
! 				 * go *only* to the postmaster log.
! 				 *
! 				 * If a client disconnects while we're in the midst
! 				 * of output, we might write quite a bit of data before
! 				 * we get to a safe query abort point.  So, suppress
! 				 * duplicate log messages.
! 				 */
! 				if (errno != last_reported_send_errno)
! 				{
! 					last_reported_send_errno = errno;
! 					ereport(COMMERROR,
! 							(errcode_for_socket_access(),
! 							 errmsg("could not send data to client: %m")));
! 				}
! 
! 				/*
! 				 * We drop the buffered data anyway so that processing can
! 				 * continue, even though we'll probably quit soon.
! 				 */
! 				PqSendStart = PqSendPointer = 0;
! 				r = EOF;
! 				break;
  			}
  
! 			last_reported_send_errno = 0;	/* reset after any successful send */
! 			bufptr += r;
! 			PqSendStart += r;
  		}
  	}
+ 	PG_CATCH();
+ 	{
+ 		/*
+ 		 * The rest of the backend code assumes the socket is in blocking
+ 		 * mode, so treat failure as FATAL.
+ 		 */
+ 		if (nonblocking)
+ 			pq_set_nonblocking(false, FATAL);
+ 	}
+ 	PG_END_TRY();
+ 	if (nonblocking)
+ 		pq_set_nonblocking(false, FATAL);
  
! 	if (r == 0 || r == EOF)
! 		return r;
! 
! 	PqSendStart = PqSendPointer = 0;
! 	return 1;
! }
! 
! /* --------------------------------
!  *		pq_flush_if_writable - flush pending output if writable
!  *
!  * Returns 1 if OK, 0 if pending output cannot be written without blocking,
!  * or EOF if trouble.
!  * --------------------------------
!  */
! int
! pq_flush_if_writable(void)
! {
! 	int			res;
! 
! 	/* No-op if reentrant call */
! 	if (PqCommBusy)
! 		return 0;
! 	PqCommBusy = true;
! 	res = internal_flush(true);
! 	PqCommBusy = false;
! 	return res;
! }
! 
! /* --------------------------------
!  *		pq_set_nonblocking - set socket blocking/non-blocking
!  *
!  * Sets the socket non-blocking if nonblocking is TRUE, or sets it
!  * blocking otherwise.
!  * --------------------------------
!  */
! static
! void pq_set_nonblocking(bool nonblocking, int emode)
! {
! #ifdef WIN32
! 	pgwin32_noblock = nonblocking ? 1 : 0;
! #else
! 	if (nonblocking)
! 	{
! 		if (!pg_set_noblock(MyProcPort->sock))
! 			ereport(emode,
! 					(errmsg("could not set socket to non-blocking mode: %m")));
! 	}
! 	else
! 	{
! 		if (!pg_set_block(MyProcPort->sock))
! 			ereport(emode,
! 					(errmsg("could not set socket to blocking mode: %m")));
! 	}
! #endif
! 	MyProcPort->noblock = nonblocking;
  }
  
  
*** a/src/backend/port/unix_latch.c
--- b/src/backend/port/unix_latch.c
***************
*** 193,211 **** DisownLatch(volatile Latch *latch)
  bool
  WaitLatch(volatile Latch *latch, long timeout)
  {
! 	return WaitLatchOrSocket(latch, PGINVALID_SOCKET, timeout) > 0;
  }
  
  /*
   * Like WaitLatch, but will also return when there's data available in
!  * 'sock' for reading. Returns 0 if timeout was reached, 1 if the latch
!  * was set, or 2 if the scoket became readable.
   */
  int
! WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, long timeout)
  {
  	struct timeval tv, *tvp = NULL;
  	fd_set		input_mask;
  	int			rc;
  	int			result = 0;
  
--- 193,213 ----
  bool
  WaitLatch(volatile Latch *latch, long timeout)
  {
! 	return WaitLatchOrSocket(latch, PGINVALID_SOCKET, false, false, timeout) > 0;
  }
  
  /*
   * Like WaitLatch, but will also return when there's data available in
!  * 'sock' for reading or writing. Returns 0 if timeout was reached,
!  * 1 if the latch was set, 2 if the scoket became readable or writable.
   */
  int
! WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, bool forRead,
! 				  bool forWrite, long timeout)
  {
  	struct timeval tv, *tvp = NULL;
  	fd_set		input_mask;
+ 	fd_set		output_mask;
  	int			rc;
  	int			result = 0;
  
***************
*** 241,254 **** WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, long timeout)
  		FD_ZERO(&input_mask);
  		FD_SET(selfpipe_readfd, &input_mask);
  		hifd = selfpipe_readfd;
! 		if (sock != PGINVALID_SOCKET)
  		{
  			FD_SET(sock, &input_mask);
  			if (sock > hifd)
  				hifd = sock;
  		}
  
! 		rc = select(hifd + 1, &input_mask, NULL, NULL, tvp);
  		if (rc < 0)
  		{
  			if (errno == EINTR)
--- 243,264 ----
  		FD_ZERO(&input_mask);
  		FD_SET(selfpipe_readfd, &input_mask);
  		hifd = selfpipe_readfd;
! 		if (sock != PGINVALID_SOCKET && forRead)
  		{
  			FD_SET(sock, &input_mask);
  			if (sock > hifd)
  				hifd = sock;
  		}
  
! 		FD_ZERO(&output_mask);
! 		if (sock != PGINVALID_SOCKET && forWrite)
! 		{
! 			FD_SET(sock, &output_mask);
! 			if (sock > hifd)
! 				hifd = sock;
! 		}
! 
! 		rc = select(hifd + 1, &input_mask, &output_mask, NULL, tvp);
  		if (rc < 0)
  		{
  			if (errno == EINTR)
***************
*** 263,269 **** WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, long timeout)
  			result = 0;
  			break;
  		}
! 		if (sock != PGINVALID_SOCKET && FD_ISSET(sock, &input_mask))
  		{
  			result = 2;
  			break;		/* data available in socket */
--- 273,281 ----
  			result = 0;
  			break;
  		}
! 		if (sock != PGINVALID_SOCKET &&
! 			((forRead && FD_ISSET(sock, &input_mask)) ||
! 			 (forWrite && FD_ISSET(sock, &output_mask))))
  		{
  			result = 2;
  			break;		/* data available in socket */
*** a/src/backend/port/win32/socket.c
--- b/src/backend/port/win32/socket.c
***************
*** 14,20 ****
  #include "postgres.h"
  
  /*
!  * Indicate if pgwin32_recv() should operate in non-blocking mode.
   *
   * Since the socket emulation layer always sets the actual socket to
   * non-blocking mode in order to be able to deliver signals, we must
--- 14,21 ----
  #include "postgres.h"
  
  /*
!  * Indicate if pgwin32_recv() and pgwin32_send() should operate
!  * in non-blocking mode.
   *
   * Since the socket emulation layer always sets the actual socket to
   * non-blocking mode in order to be able to deliver signals, we must
***************
*** 399,404 **** pgwin32_send(SOCKET s, char *buf, int len, int flags)
--- 400,415 ----
  			return -1;
  		}
  
+ 		if (pgwin32_noblock)
+ 		{
+ 			/*
+ 			 * No data sent, and we are in "emulated non-blocking mode", so
+ 			 * return indicating that we'd block if we were to continue.
+ 			 */
+ 			errno = EWOULDBLOCK;
+ 			return -1;
+ 		}
+ 
  		/* No error, zero bytes (win2000+) or error+WSAEWOULDBLOCK (<=nt4) */
  
  		if (pgwin32_waitforsinglesocket(s, FD_WRITE | FD_CLOSE, INFINITE) == 0)
*** a/src/backend/port/win32_latch.c
--- b/src/backend/port/win32_latch.c
***************
*** 85,95 **** DisownLatch(volatile Latch *latch)
  bool
  WaitLatch(volatile Latch *latch, long timeout)
  {
! 	return WaitLatchOrSocket(latch, PGINVALID_SOCKET, timeout) > 0;
  }
  
  int
! WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, long timeout)
  {
  	DWORD		rc;
  	HANDLE		events[3];
--- 85,96 ----
  bool
  WaitLatch(volatile Latch *latch, long timeout)
  {
! 	return WaitLatchOrSocket(latch, PGINVALID_SOCKET, false, false, timeout) > 0;
  }
  
  int
! WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, bool forRead,
! 				  bool forWrite, long timeout)
  {
  	DWORD		rc;
  	HANDLE		events[3];
***************
*** 103,112 **** WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, long timeout)
  	events[0] = latchevent;
  	events[1] = pgwin32_signal_event;
  	numevents = 2;
! 	if (sock != PGINVALID_SOCKET)
  	{
  		sockevent = WSACreateEvent();
! 		WSAEventSelect(sock, sockevent, FD_READ);
  		events[numevents++] = sockevent;
  	}
  
--- 104,120 ----
  	events[0] = latchevent;
  	events[1] = pgwin32_signal_event;
  	numevents = 2;
! 	if (sock != PGINVALID_SOCKET && (forRead || forWrite))
  	{
+ 		int		flags = 0;
+ 
+ 		if (forRead)
+ 			flags |= FD_READ;
+ 		if (forWrite)
+ 			flags |= FD_WRITE;
+ 
  		sockevent = WSACreateEvent();
! 		WSAEventSelect(sock, sockevent, flags);
  		events[numevents++] = sockevent;
  	}
  
***************
*** 139,146 **** WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, long timeout)
  			pgwin32_dispatch_queued_signals();
  		else if (rc == WAIT_OBJECT_0 + 2)
  		{
  			Assert(sock != PGINVALID_SOCKET);
! 			result = 2;
  			break;
  		}
  		else if (rc != WAIT_OBJECT_0)
--- 147,164 ----
  			pgwin32_dispatch_queued_signals();
  		else if (rc == WAIT_OBJECT_0 + 2)
  		{
+ 			WSANETWORKEVENTS resEvents;
+ 
  			Assert(sock != PGINVALID_SOCKET);
! 
! 			ZeroMemory(&resEvents, sizeof(resEvents));
! 			if (WSAEnumNetworkEvents(sock, sockevent, &resEvents) == SOCKET_ERROR)
! 				ereport(FATAL,
! 						(errmsg_internal("failed to enumerate network events: %i", (int) GetLastError())));
! 
! 			if ((forRead && resEvents.lNetworkEvents & FD_READ) ||
! 				(forWrite && resEvents.lNetworkEvents & FD_WRITE))
! 				result = 2;
  			break;
  		}
  		else if (rc != WAIT_OBJECT_0)
***************
*** 148,154 **** WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, long timeout)
  	}
  
  	/* Clean up the handle we created for the socket */
! 		if (sock != PGINVALID_SOCKET)
  	{
  		WSAEventSelect(sock, sockevent, 0);
  		WSACloseEvent(sockevent);
--- 166,172 ----
  	}
  
  	/* Clean up the handle we created for the socket */
! 	if (sock != PGINVALID_SOCKET && (forRead || forWrite))
  	{
  		WSAEventSelect(sock, sockevent, 0);
  		WSACloseEvent(sockevent);
*** a/src/backend/replication/walsender.c
--- b/src/backend/replication/walsender.c
***************
*** 74,79 **** bool		am_walsender = false;		/* Am I a walsender process ? */
--- 74,91 ----
  /* User-settable parameters for walsender */
  int			max_wal_senders = 0;	/* the maximum number of concurrent walsenders */
  int			WalSndDelay = 1000;	/* max sleep time between some actions */
+ int			replication_timeout = 60 * 1000;	/* maximum time to send one WAL data message */
+ 
+ /*
+  * Buffer for WAL sending
+  *
+  * WalSndOutBuffer is a work area in which the output message is constructed.
+  * It's used in just so we can avoid re-palloc'ing the buffer on each cycle.
+  * It must be of size 6 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE.
+  */
+ static char	   *WalSndOutBuffer;
+ static int		WalSndOutHead;		/* head of pending output */
+ static int		WalSndOutTail;		/* tail of pending output */
  
  /*
   * These variables are used similarly to openLogFile/Id/Seg/Off,
***************
*** 95,100 **** static XLogRecPtr sentPtr = {0, 0};
--- 107,117 ----
   */
  static StringInfoData reply_message;
  
+ /*
+  * Timestamp of the last receipt of the reply from the standby.
+  */
+ static TimestampTz last_reply_timestamp;
+ 
  /* Flags set by signal handlers for later service in main loop */
  static volatile sig_atomic_t got_SIGHUP = false;
  volatile sig_atomic_t walsender_shutdown_requested = false;
***************
*** 113,119 **** static int	WalSndLoop(void);
  static void InitWalSnd(void);
  static void WalSndHandshake(void);
  static void WalSndKill(int code, Datum arg);
! static bool XLogSend(char *msgbuf, bool *caughtup);
  static void IdentifySystem(void);
  static void StartReplication(StartReplicationCmd * cmd);
  static void ProcessStandbyMessage(void);
--- 130,136 ----
  static void InitWalSnd(void);
  static void WalSndHandshake(void);
  static void WalSndKill(int code, Datum arg);
! static bool XLogSend(bool *caughtup, bool *pending);
  static void IdentifySystem(void);
  static void StartReplication(StartReplicationCmd * cmd);
  static void ProcessStandbyMessage(void);
***************
*** 469,474 **** ProcessRepliesIfAny(void)
--- 486,492 ----
  {
  	unsigned char firstchar;
  	int			r;
+ 	int		received = false;
  
  	for (;;)
  	{
***************
*** 481,489 **** ProcessRepliesIfAny(void)
  					 errmsg("unexpected EOF on standby connection")));
  			proc_exit(0);
  		}
! 		if (r == 0)
  		{
! 			/* no data available without blocking */
  			return;
  		}
  
--- 499,512 ----
  					 errmsg("unexpected EOF on standby connection")));
  			proc_exit(0);
  		}
! 		if (r == 0)	/* no data available without blocking */
  		{
! 			/*
! 			 * Save the last reply timestamp if we've received at least
! 			 * one reply.
! 			 */
! 			if (received)
! 				last_reply_timestamp = GetCurrentTimestamp();
  			return;
  		}
  
***************
*** 495,500 **** ProcessRepliesIfAny(void)
--- 518,524 ----
  				 */
  			case 'd':
  				ProcessStandbyMessage();
+ 				received = true;
  				break;
  
  				/*
***************
*** 673,687 **** ProcessStandbyHSFeedbackMessage(void)
  static int
  WalSndLoop(void)
  {
- 	char	   *output_message;
  	bool		caughtup = false;
  
  	/*
  	 * Allocate buffer that will be used for each output message.  We do this
  	 * just once to reduce palloc overhead.  The buffer must be made large
  	 * enough for maximum-sized messages.
  	 */
! 	output_message = palloc(1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE);
  
  	/*
  	 * Allocate buffer that will be used for processing reply messages.  As
--- 697,712 ----
  static int
  WalSndLoop(void)
  {
  	bool		caughtup = false;
+ 	bool		pending = false;
  
  	/*
  	 * Allocate buffer that will be used for each output message.  We do this
  	 * just once to reduce palloc overhead.  The buffer must be made large
  	 * enough for maximum-sized messages.
  	 */
! 	WalSndOutBuffer = palloc(6 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE);
! 	WalSndOutHead = WalSndOutTail = 0;
  
  	/*
  	 * Allocate buffer that will be used for processing reply messages.  As
***************
*** 689,694 **** WalSndLoop(void)
--- 714,722 ----
  	 */
  	initStringInfo(&reply_message);
  
+ 	/* Initialize the last reply timestamp */
+ 	last_reply_timestamp = GetCurrentTimestamp();
+ 
  	/* Loop forever, unless we get an error */
  	for (;;)
  	{
***************
*** 713,722 **** WalSndLoop(void)
  		 */
  		if (walsender_ready_to_stop)
  		{
! 			if (!XLogSend(output_message, &caughtup))
  				break;
  			ProcessRepliesIfAny();
! 			if (caughtup)
  				walsender_shutdown_requested = true;
  		}
  
--- 741,750 ----
  		 */
  		if (walsender_ready_to_stop)
  		{
! 			if (!XLogSend(&caughtup, &pending))
  				break;
  			ProcessRepliesIfAny();
! 			if (caughtup && !pending)
  				walsender_shutdown_requested = true;
  		}
  
***************
*** 731,740 **** WalSndLoop(void)
  		}
  
  		/*
! 		 * If we had sent all accumulated WAL in last round, nap for the
! 		 * configured time before retrying.
  		 */
! 		if (caughtup)
  		{
  			/*
  			 * Even if we wrote all the WAL that was available when we started
--- 759,769 ----
  		}
  
  		/*
! 		 * If we had sent all accumulated WAL in last round or could not
! 		 * flush pending WAL in output buffer because the socket was not
! 		 * writable, nap for the configured time before retrying.
  		 */
! 		if (caughtup || pending)
  		{
  			/*
  			 * Even if we wrote all the WAL that was available when we started
***************
*** 745,769 **** WalSndLoop(void)
  			 */
  			ResetLatch(&MyWalSnd->latch);
  
! 			if (!XLogSend(output_message, &caughtup))
  				break;
! 			if (caughtup && !got_SIGHUP && !walsender_ready_to_stop && !walsender_shutdown_requested)
  			{
  				/*
  				 * XXX: We don't really need the periodic wakeups anymore,
  				 * WaitLatchOrSocket should reliably wake up as soon as
  				 * something interesting happens.
  				 */
  
  				/* Sleep */
  				WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock,
! 								  WalSndDelay * 1000L);
  			}
  		}
  		else
  		{
  			/* Attempt to send the log once every loop */
! 			if (!XLogSend(output_message, &caughtup))
  				break;
  		}
  
--- 774,834 ----
  			 */
  			ResetLatch(&MyWalSnd->latch);
  
! 			if (!XLogSend(&caughtup, &pending))
  				break;
! 			if ((caughtup || pending) && !got_SIGHUP && !walsender_ready_to_stop &&
! 					!walsender_shutdown_requested)
  			{
+ 				TimestampTz	finish_time;
+ 				long		sleeptime;
+ 
  				/*
  				 * XXX: We don't really need the periodic wakeups anymore,
  				 * WaitLatchOrSocket should reliably wake up as soon as
  				 * something interesting happens.
  				 */
  
+ 				/* Reschedule replication timeout */
+ 				if (replication_timeout > 0)
+ 				{
+ 					long		secs;
+ 					int		usecs;
+ 
+ 					finish_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
+ 											replication_timeout);
+ 					TimestampDifference(GetCurrentTimestamp(),
+ 								finish_time, &secs, &usecs);
+ 					sleeptime = secs * 1000 + usecs / 1000;
+ 					if (WalSndDelay < sleeptime)
+ 						sleeptime = WalSndDelay;
+ 				}
+ 				else
+ 					sleeptime = WalSndDelay;
+ 
  				/* Sleep */
  				WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock,
! 								  true, (WalSndOutTail > 0),
! 								  sleeptime * 1000L);
! 
! 				/* Check for replication timeout */
! 				if (replication_timeout > 0 &&
! 					GetCurrentTimestamp() >= finish_time)
! 				{
! 					/*
! 					 * Since typically expiration of replication timeout means
! 					 * communication problem, we don't send the error message
! 					 * to the standby.
! 					 */
! 					ereport(COMMERROR,
! 							(errmsg("terminating walsender process due to replication timeout")));
! 					break;
! 				}
  			}
  		}
  		else
  		{
  			/* Attempt to send the log once every loop */
! 			if (!XLogSend(&caughtup, &pending))
  				break;
  		}
  
***************
*** 996,1019 **** XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
   * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
   * but not yet sent to the client, and send it.
   *
-  * msgbuf is a work area in which the output message is constructed.  It's
-  * passed in just so we can avoid re-palloc'ing the buffer on each cycle.
-  * It must be of size 1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE.
-  *
   * If there is no unsent WAL remaining, *caughtup is set to true, otherwise
   * *caughtup is set to false.
   *
   * Returns true if OK, false if trouble.
   */
  static bool
! XLogSend(char *msgbuf, bool *caughtup)
  {
  	XLogRecPtr	SendRqstPtr;
  	XLogRecPtr	startptr;
! 	XLogRecPtr	endptr;
  	Size		nbytes;
  	WalDataMessageHeader msghdr;
  
  	/*
  	 * Attempt to send all data that's already been written out and fsync'd to
  	 * disk.  We cannot go further than what's been written out given the
--- 1061,1108 ----
   * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
   * but not yet sent to the client, and send it.
   *
   * If there is no unsent WAL remaining, *caughtup is set to true, otherwise
   * *caughtup is set to false.
   *
+  * If there is pending WAL in output buffer, *pending is set to true,
+  * otherwise *pending is set to false.
+  *
   * Returns true if OK, false if trouble.
   */
  static bool
! XLogSend(bool *caughtup, bool *pending)
  {
  	XLogRecPtr	SendRqstPtr;
  	XLogRecPtr	startptr;
! 	static XLogRecPtr	endptr;
  	Size		nbytes;
+ 	uint32		n32;
+ 	int			res;
  	WalDataMessageHeader msghdr;
  
+ 	/* Attempt to flush pending WAL in output buffer */
+ 	if (*pending)
+ 	{
+ 		if (WalSndOutHead != WalSndOutTail)
+ 		{
+ 			res = pq_putbytes_if_writable(WalSndOutBuffer + WalSndOutHead,
+ 										  WalSndOutTail - WalSndOutHead);
+ 			if (res == EOF)
+ 				return false;
+ 			WalSndOutHead += res;
+ 			if (WalSndOutHead != WalSndOutTail)
+ 				return true;
+ 		}
+ 
+ 		res = pq_flush_if_writable();
+ 		if (res == EOF)
+ 			return false;
+ 		if (res == 0)
+ 			return true;
+ 
+ 		goto updt;
+ 	}
+ 
  	/*
  	 * Attempt to send all data that's already been written out and fsync'd to
  	 * disk.  We cannot go further than what's been written out given the
***************
*** 1082,1094 **** XLogSend(char *msgbuf, bool *caughtup)
  	/*
  	 * OK to read and send the slice.
  	 */
! 	msgbuf[0] = 'w';
  
  	/*
  	 * Read the log directly into the output buffer to avoid extra memcpy
  	 * calls.
  	 */
! 	XLogRead(msgbuf + 1 + sizeof(WalDataMessageHeader), startptr, nbytes);
  
  	/*
  	 * We fill the message header last so that the send timestamp is taken as
--- 1171,1189 ----
  	/*
  	 * OK to read and send the slice.
  	 */
! 	WalSndOutBuffer[0] = 'd';
! 	WalSndOutBuffer[5] = 'w';
! 	WalSndOutHead = 0;
! 	WalSndOutTail = 6 + sizeof(WalDataMessageHeader) + nbytes;
! 
! 	n32 = htonl((uint32) WalSndOutTail - 1);
! 	memcpy(WalSndOutBuffer + 1, &n32, 4);
  
  	/*
  	 * Read the log directly into the output buffer to avoid extra memcpy
  	 * calls.
  	 */
! 	XLogRead(WalSndOutBuffer + 6 + sizeof(WalDataMessageHeader), startptr, nbytes);
  
  	/*
  	 * We fill the message header last so that the send timestamp is taken as
***************
*** 1098,1110 **** XLogSend(char *msgbuf, bool *caughtup)
  	msghdr.walEnd = SendRqstPtr;
  	msghdr.sendTime = GetCurrentTimestamp();
  
! 	memcpy(msgbuf + 1, &msghdr, sizeof(WalDataMessageHeader));
  
! 	pq_putmessage('d', msgbuf, 1 + sizeof(WalDataMessageHeader) + nbytes);
  
  	/* Flush pending output to the client */
! 	if (pq_flush())
  		return false;
  
  	sentPtr = endptr;
  
--- 1193,1226 ----
  	msghdr.walEnd = SendRqstPtr;
  	msghdr.sendTime = GetCurrentTimestamp();
  
! 	memcpy(WalSndOutBuffer + 6, &msghdr, sizeof(WalDataMessageHeader));
  
! 	res = pq_putbytes_if_writable(WalSndOutBuffer, WalSndOutTail);
! 	if (res == EOF)
! 		return false;
! 
! 	WalSndOutHead = res;
! 	if (WalSndOutHead != WalSndOutTail)
! 	{
! 		*caughtup = false;
! 		*pending = true;
! 		return true;
! 	}
  
  	/* Flush pending output to the client */
! 	res = pq_flush_if_writable();
! 	if (res == EOF)
  		return false;
+ 	if (res == 0)
+ 	{
+ 		*caughtup = false;
+ 		*pending = true;
+ 		return true;
+ 	}
+ 
+ updt:
+ 	WalSndOutHead = WalSndOutTail = 0;
+ 	*pending = false;
  
  	sentPtr = endptr;
  
*** a/src/backend/utils/misc/guc.c
--- b/src/backend/utils/misc/guc.c
***************
*** 1856,1861 **** static struct config_int ConfigureNamesInt[] =
--- 1856,1871 ----
  	},
  
  	{
+ 		{"replication_timeout", PGC_SIGHUP, WAL_REPLICATION,
+ 			gettext_noop("Sets the maximum time to wait for WAL replication."),
+ 			NULL,
+ 			GUC_UNIT_MS
+ 		},
+ 		&replication_timeout,
+ 		60 * 1000, 0, INT_MAX, NULL, NULL
+ 	},
+ 
+ 	{
  		{"commit_delay", PGC_USERSET, WAL_SETTINGS,
  			gettext_noop("Sets the delay in microseconds between transaction commit and "
  						 "flushing WAL to disk."),
*** a/src/backend/utils/misc/postgresql.conf.sample
--- b/src/backend/utils/misc/postgresql.conf.sample
***************
*** 200,205 ****
--- 200,206 ----
  #wal_sender_delay = 1s		# walsender cycle time, 1-10000 milliseconds
  #wal_keep_segments = 0		# in logfile segments, 16MB each; 0 disables
  #vacuum_defer_cleanup_age = 0	# number of xacts by which cleanup is delayed
+ #replication_timeout = 60s # in milliseconds, 0 is disabled
  
  # - Standby Servers -
  
*** a/src/include/libpq/libpq.h
--- b/src/include/libpq/libpq.h
***************
*** 59,65 **** extern int	pq_getbyte(void);
--- 59,67 ----
  extern int	pq_peekbyte(void);
  extern int	pq_getbyte_if_available(unsigned char *c);
  extern int	pq_putbytes(const char *s, size_t len);
+ extern int	pq_putbytes_if_writable(const char *s, size_t len);
  extern int	pq_flush(void);
+ extern int	pq_flush_if_writable(void);
  extern int	pq_putmessage(char msgtype, const char *s, size_t len);
  extern void pq_startcopyout(void);
  extern void pq_endcopyout(bool errorAbort);
*** a/src/include/replication/walsender.h
--- b/src/include/replication/walsender.h
***************
*** 91,96 **** extern volatile sig_atomic_t walsender_ready_to_stop;
--- 91,97 ----
  /* user-settable parameters */
  extern int	WalSndDelay;
  extern int	max_wal_senders;
+ extern int	replication_timeout;
  
  extern int	WalSenderMain(void);
  extern void WalSndSignals(void);
*** a/src/include/storage/latch.h
--- b/src/include/storage/latch.h
***************
*** 40,46 **** extern void OwnLatch(volatile Latch *latch);
  extern void DisownLatch(volatile Latch *latch);
  extern bool WaitLatch(volatile Latch *latch, long timeout);
  extern int	WaitLatchOrSocket(volatile Latch *latch, pgsocket sock,
! 				  long timeout);
  extern void SetLatch(volatile Latch *latch);
  extern void ResetLatch(volatile Latch *latch);
  #define TestLatch(latch) (((volatile Latch *) latch)->is_set)
--- 40,46 ----
  extern void DisownLatch(volatile Latch *latch);
  extern bool WaitLatch(volatile Latch *latch, long timeout);
  extern int	WaitLatchOrSocket(volatile Latch *latch, pgsocket sock,
! 				  bool forRead, bool forWrite, long timeout);
  extern void SetLatch(volatile Latch *latch);
  extern void ResetLatch(volatile Latch *latch);
  #define TestLatch(latch) (((volatile Latch *) latch)->is_set)
