On Tue, Jan 19, 2010 at 12:20 AM, Heikki Linnakangas
<heikki.linnakan...@enterprisedb.com> wrote:
> Tom Lane wrote:
>> Heikki Linnakangas <heikki.linnakan...@enterprisedb.com> writes:
>>> Simon Riggs wrote:
>>>> Do we need a new record type for that, is there a handy record type to
>>>> bounce from?
>>
>>> After starting streaming, slices of WAL are sent as CopyData messages.
>>> The CopyData payload begins with an XLogRecPtr, followed by the WAL
>>> data. That payload format needs to be extended with a 'message type'
>>> field and a new message type for the timestamps need to be added.
>>
>> Whether or not anyone bothers with the timestamp message, I think adding
>> a message type header is a Must Fix item.  A protocol with no provision
>> for extension is certainly going to bite us in the rear before long.
>
> Agreed a message type header is a good idea, although we don't expect
> streaming replication and the protocol to work across different major
> versions anyway.

The attached patch adds a message type header into the payload in
CopyData message sent from walsender to walreceiver, to make the
replication protocol more extensible.

Regards,

-- 
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
*** a/doc/src/sgml/protocol.sgml
--- b/doc/src/sgml/protocol.sgml
***************
*** 4179,4190 **** The commands accepted in walsender mode are:
        already been recycled. On success, server responds with a
        CopyOutResponse message, and backend starts to stream WAL as CopyData
        messages.
       </para>
  
       <para>
!       The payload in each CopyData message consists of an XLogRecPtr,
!       indicating the starting point of the WAL in the message, immediately
!       followed by the WAL data itself.
       </para>
       <para>
         A single WAL record is never split across two CopyData messages. When
--- 4179,4243 ----
        already been recycled. On success, server responds with a
        CopyOutResponse message, and backend starts to stream WAL as CopyData
        messages.
+       The payload in CopyData message consists of the following format.
       </para>
  
       <para>
!       <variablelist>
!       <varlistentry>
!       <term>
!           XLogData (B)
!       </term>
!       <listitem>
!       <para>
!       <variablelist>
!       <varlistentry>
!       <term>
!           Byte1('w')
!       </term>
!       <listitem>
!       <para>
!           Identifies the message as WAL data.
!       </para>
!       </listitem>
!       </varlistentry>
!       <varlistentry>
!       <term>
!           Int32
!       </term>
!       <listitem>
!       <para>
!           The log file number of the LSN, indicating the starting point of
!           the WAL in the message.
!       </para>
!       </listitem>
!       </varlistentry>
!       <varlistentry>
!       <term>
!           Int32
!       </term>
!       <listitem>
!       <para>
!           The byte offset of the LSN, indicating the starting point of
!           the WAL in the message.
!       </para>
!       </listitem>
!       </varlistentry>
!       <varlistentry>
!       <term>
!           Byte<replaceable>n</replaceable>
!       </term>
!       <listitem>
!       <para>
!           Data that forms part of WAL data stream.
!       </para>
!       </listitem>
!       </varlistentry>
!       </variablelist>
!       </para>
!       </listitem>
!       </varlistentry>
!       </variablelist>
       </para>
       <para>
         A single WAL record is never split across two CopyData messages. When
*** a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
--- b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
***************
*** 48,55 **** static char *recvBuf = NULL;
  
  /* Prototypes for interface functions */
  static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint);
! static bool libpqrcv_receive(int timeout, XLogRecPtr *recptr, char **buffer,
! 			  int *len);
  static void libpqrcv_disconnect(void);
  
  /* Prototypes for private functions */
--- 48,55 ----
  
  /* Prototypes for interface functions */
  static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint);
! static bool libpqrcv_receive(int timeout, unsigned char *type,
! 							 char **buffer, int *len);
  static void libpqrcv_disconnect(void);
  
  /* Prototypes for private functions */
***************
*** 236,248 **** libpqrcv_disconnect(void)
  }
  
  /*
!  * Receive any WAL records available from XLOG stream, blocking for
   * maximum of 'timeout' ms.
   *
   * Returns:
   *
!  *   True if data was received. *recptr, *buffer and *len are set to
!  *   the WAL location of the received data, buffer holding it, and length,
   *   respectively.
   *
   *   False if no data was available within timeout, or wait was interrupted
--- 236,248 ----
  }
  
  /*
!  * Receive any messages available from XLOG stream, blocking for
   * maximum of 'timeout' ms.
   *
   * Returns:
   *
!  *   True if data was received. *type, *buffer and *len are set to
!  *   the type of the received data, buffer holding it, and length,
   *   respectively.
   *
   *   False if no data was available within timeout, or wait was interrupted
***************
*** 254,260 **** libpqrcv_disconnect(void)
   * ereports on error.
   */
  static bool
! libpqrcv_receive(int timeout, XLogRecPtr *recptr, char **buffer, int *len)
  {
  	int			rawlen;
  
--- 254,260 ----
   * ereports on error.
   */
  static bool
! libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len)
  {
  	int			rawlen;
  
***************
*** 275,288 **** libpqrcv_receive(int timeout, XLogRecPtr *recptr, char **buffer, int *len)
  
  		if (PQconsumeInput(streamConn) == 0)
  			ereport(ERROR,
! 					(errmsg("could not read xlog records: %s",
  							PQerrorMessage(streamConn))));
  	}
  	justconnected = false;
  
  	/* Receive CopyData message */
  	rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
! 	if (rawlen == 0)	/* no records available yet, then return */
  		return false;
  	if (rawlen == -1)	/* end-of-streaming or error */
  	{
--- 275,288 ----
  
  		if (PQconsumeInput(streamConn) == 0)
  			ereport(ERROR,
! 					(errmsg("could not receive data from XLOG stream: %s",
  							PQerrorMessage(streamConn))));
  	}
  	justconnected = false;
  
  	/* Receive CopyData message */
  	rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
! 	if (rawlen == 0)	/* no data available yet, then return */
  		return false;
  	if (rawlen == -1)	/* end-of-streaming or error */
  	{
***************
*** 297,318 **** libpqrcv_receive(int timeout, XLogRecPtr *recptr, char **buffer, int *len)
  		}
  		PQclear(res);
  		ereport(ERROR,
! 				(errmsg("could not read xlog records: %s",
  						PQerrorMessage(streamConn))));
  	}
  	if (rawlen < -1)
  		ereport(ERROR,
! 				(errmsg("could not read xlog records: %s",
  						PQerrorMessage(streamConn))));
  
! 	if (rawlen < sizeof(XLogRecPtr))
! 		ereport(ERROR,
! 				(errmsg("invalid WAL message received from primary")));
! 
! 	/* Return received WAL records to caller */
! 	*recptr = *((XLogRecPtr *) recvBuf);
! 	*buffer = recvBuf + sizeof(XLogRecPtr);
! 	*len = rawlen - sizeof(XLogRecPtr);
  
  	return true;
  }
--- 297,314 ----
  		}
  		PQclear(res);
  		ereport(ERROR,
! 				(errmsg("could not receive data from XLOG stream: %s",
  						PQerrorMessage(streamConn))));
  	}
  	if (rawlen < -1)
  		ereport(ERROR,
! 				(errmsg("could not receive data from XLOG stream: %s",
  						PQerrorMessage(streamConn))));
  
! 	/* Return received messages to caller */
! 	*type = *((unsigned char *) recvBuf);
! 	*buffer = recvBuf + sizeof(*type);
! 	*len = rawlen - sizeof(*type);
  
  	return true;
  }
*** a/src/backend/replication/walreceiver.c
--- b/src/backend/replication/walreceiver.c
***************
*** 135,140 **** static void WalRcvQuickDieHandler(SIGNAL_ARGS);
--- 135,141 ----
  
  /* Prototypes for private functions */
  static void WalRcvDie(int code, Datum arg);
+ static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
  static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
  static void XLogWalRcvFlush(void);
  
***************
*** 258,264 **** WalReceiverMain(void)
  	/* Loop until end-of-streaming or error */
  	for (;;)
  	{
! 		XLogRecPtr recptr;
  		char   *buf;
  		int		len;
  
--- 259,265 ----
  	/* Loop until end-of-streaming or error */
  	for (;;)
  	{
! 		unsigned char	type;
  		char   *buf;
  		int		len;
  
***************
*** 287,303 **** WalReceiverMain(void)
  		}
  
  		/* Wait a while for data to arrive */
! 		if (walrcv_receive(NAPTIME_PER_CYCLE, &recptr, &buf, &len))
  		{
! 			/* Write received WAL records to disk */
! 			XLogWalRcvWrite(buf, len, recptr);
  
! 			/* Receive any more WAL records we can without sleeping */
! 			while(walrcv_receive(0, &recptr, &buf, &len))
! 				XLogWalRcvWrite(buf, len, recptr);
  
  			/*
! 			 * Now that we've written some records, flush them to disk and
  			 * let the startup process know about them.
  			 */
  			XLogWalRcvFlush();
--- 288,304 ----
  		}
  
  		/* Wait a while for data to arrive */
! 		if (walrcv_receive(NAPTIME_PER_CYCLE, &type, &buf, &len))
  		{
! 			/* Accept the received data, and process it */
! 			XLogWalRcvProcessMsg(type, buf, len);
  
! 			/* Receive any more data we can without sleeping */
! 			while(walrcv_receive(0, &type, &buf, &len))
! 				XLogWalRcvProcessMsg(type, buf, len);
  
  			/*
! 			 * If we've written some records, flush them to disk and
  			 * let the startup process know about them.
  			 */
  			XLogWalRcvFlush();
***************
*** 376,381 **** WalRcvQuickDieHandler(SIGNAL_ARGS)
--- 377,412 ----
  }
  
  /*
+  * Accept the message from XLOG stream, and process it.
+  */
+ static void
+ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
+ {
+ 	switch (type)
+ 	{
+ 		case 'w':	/* WAL records */
+ 		{
+ 			XLogRecPtr	recptr;
+ 
+ 			if (len < sizeof(XLogRecPtr))
+ 				ereport(ERROR,
+ 						(errmsg("invalid WAL message received from primary")));
+ 
+ 			recptr = *((XLogRecPtr *) buf);
+ 			buf += sizeof(XLogRecPtr);
+ 			len -= sizeof(XLogRecPtr);
+ 			XLogWalRcvWrite(buf, len, recptr);
+ 			break;
+ 		}
+ 		default:
+ 			ereport(ERROR,
+ 					(errcode(ERRCODE_PROTOCOL_VIOLATION),
+ 					 errmsg("invalid replication message type %d",
+ 							type)));
+ 	}
+ }
+ 
+ /*
   * Write XLOG data to disk.
   */
  static void
*** a/src/backend/replication/walsender.c
--- b/src/backend/replication/walsender.c
***************
*** 659,664 **** XLogSend(StringInfo outMsg)
--- 659,665 ----
  		 * have the same byte order. If they have different byte order, we
  		 * don't reach here.
  		 */
+ 		pq_sendbyte(outMsg, 'w');
  		pq_sendbytes(outMsg, (char *) &startptr, sizeof(startptr));
  
  		if (endptr.xlogid != startptr.xlogid)
*** a/src/include/replication/walreceiver.h
--- b/src/include/replication/walreceiver.h
***************
*** 66,72 **** extern WalRcvData *WalRcv;
  typedef bool (*walrcv_connect_type) (char *conninfo, XLogRecPtr startpoint);
  extern PGDLLIMPORT walrcv_connect_type walrcv_connect;
  
! typedef bool (*walrcv_receive_type) (int timeout, XLogRecPtr *recptr, char **buffer, int *len);
  extern PGDLLIMPORT walrcv_receive_type walrcv_receive;
  
  typedef void (*walrcv_disconnect_type) (void);
--- 66,73 ----
  typedef bool (*walrcv_connect_type) (char *conninfo, XLogRecPtr startpoint);
  extern PGDLLIMPORT walrcv_connect_type walrcv_connect;
  
! typedef bool (*walrcv_receive_type) (int timeout, unsigned char *type,
! 									 char **buffer, int *len);
  extern PGDLLIMPORT walrcv_receive_type walrcv_receive;
  
  typedef void (*walrcv_disconnect_type) (void);
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to