On Tue, Jan 19, 2010 at 12:20 AM, Heikki Linnakangas <heikki.linnakan...@enterprisedb.com> wrote: > Tom Lane wrote: >> Heikki Linnakangas <heikki.linnakan...@enterprisedb.com> writes: >>> Simon Riggs wrote: >>>> Do we need a new record type for that, is there a handy record type to >>>> bounce from? >> >>> After starting streaming, slices of WAL are sent as CopyData messages. >>> The CopyData payload begins with an XLogRecPtr, followed by the WAL >>> data. That payload format needs to be extended with a 'message type' >>> field and a new message type for the timestamps need to be added. >> >> Whether or not anyone bothers with the timestamp message, I think adding >> a message type header is a Must Fix item. A protocol with no provision >> for extension is certainly going to bite us in the rear before long. > > Agreed a message type header is a good idea, although we don't expect > streaming replication and the protocol to work across different major > versions anyway.
The attached patch adds a message type header into the payload in CopyData message sent from walsender to walreceiver, to make the replication protocol more extensible. Regards, -- Fujii Masao NIPPON TELEGRAPH AND TELEPHONE CORPORATION NTT Open Source Software Center
*** a/doc/src/sgml/protocol.sgml --- b/doc/src/sgml/protocol.sgml *************** *** 4179,4190 **** The commands accepted in walsender mode are: already been recycled. On success, server responds with a CopyOutResponse message, and backend starts to stream WAL as CopyData messages. </para> <para> ! The payload in each CopyData message consists of an XLogRecPtr, ! indicating the starting point of the WAL in the message, immediately ! followed by the WAL data itself. </para> <para> A single WAL record is never split across two CopyData messages. When --- 4179,4243 ---- already been recycled. On success, server responds with a CopyOutResponse message, and backend starts to stream WAL as CopyData messages. + The payload in CopyData message consists of the following format. </para> <para> ! <variablelist> ! <varlistentry> ! <term> ! XLogData (B) ! </term> ! <listitem> ! <para> ! <variablelist> ! <varlistentry> ! <term> ! Byte1('w') ! </term> ! <listitem> ! <para> ! Identifies the message as WAL data. ! </para> ! </listitem> ! </varlistentry> ! <varlistentry> ! <term> ! Int32 ! </term> ! <listitem> ! <para> ! The log file number of the LSN, indicating the starting point of ! the WAL in the message. ! </para> ! </listitem> ! </varlistentry> ! <varlistentry> ! <term> ! Int32 ! </term> ! <listitem> ! <para> ! The byte offset of the LSN, indicating the starting point of ! the WAL in the message. ! </para> ! </listitem> ! </varlistentry> ! <varlistentry> ! <term> ! Byte<replaceable>n</replaceable> ! </term> ! <listitem> ! <para> ! Data that forms part of WAL data stream. ! </para> ! </listitem> ! </varlistentry> ! </variablelist> ! </para> ! </listitem> ! </varlistentry> ! </variablelist> </para> <para> A single WAL record is never split across two CopyData messages. When *** a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c --- b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c *************** *** 48,55 **** static char *recvBuf = NULL; /* Prototypes for interface functions */ static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint); ! static bool libpqrcv_receive(int timeout, XLogRecPtr *recptr, char **buffer, ! int *len); static void libpqrcv_disconnect(void); /* Prototypes for private functions */ --- 48,55 ---- /* Prototypes for interface functions */ static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint); ! static bool libpqrcv_receive(int timeout, unsigned char *type, ! char **buffer, int *len); static void libpqrcv_disconnect(void); /* Prototypes for private functions */ *************** *** 236,248 **** libpqrcv_disconnect(void) } /* ! * Receive any WAL records available from XLOG stream, blocking for * maximum of 'timeout' ms. * * Returns: * ! * True if data was received. *recptr, *buffer and *len are set to ! * the WAL location of the received data, buffer holding it, and length, * respectively. * * False if no data was available within timeout, or wait was interrupted --- 236,248 ---- } /* ! * Receive any messages available from XLOG stream, blocking for * maximum of 'timeout' ms. * * Returns: * ! * True if data was received. *type, *buffer and *len are set to ! * the type of the received data, buffer holding it, and length, * respectively. * * False if no data was available within timeout, or wait was interrupted *************** *** 254,260 **** libpqrcv_disconnect(void) * ereports on error. */ static bool ! libpqrcv_receive(int timeout, XLogRecPtr *recptr, char **buffer, int *len) { int rawlen; --- 254,260 ---- * ereports on error. */ static bool ! libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len) { int rawlen; *************** *** 275,288 **** libpqrcv_receive(int timeout, XLogRecPtr *recptr, char **buffer, int *len) if (PQconsumeInput(streamConn) == 0) ereport(ERROR, ! (errmsg("could not read xlog records: %s", PQerrorMessage(streamConn)))); } justconnected = false; /* Receive CopyData message */ rawlen = PQgetCopyData(streamConn, &recvBuf, 1); ! if (rawlen == 0) /* no records available yet, then return */ return false; if (rawlen == -1) /* end-of-streaming or error */ { --- 275,288 ---- if (PQconsumeInput(streamConn) == 0) ereport(ERROR, ! (errmsg("could not receive data from XLOG stream: %s", PQerrorMessage(streamConn)))); } justconnected = false; /* Receive CopyData message */ rawlen = PQgetCopyData(streamConn, &recvBuf, 1); ! if (rawlen == 0) /* no data available yet, then return */ return false; if (rawlen == -1) /* end-of-streaming or error */ { *************** *** 297,318 **** libpqrcv_receive(int timeout, XLogRecPtr *recptr, char **buffer, int *len) } PQclear(res); ereport(ERROR, ! (errmsg("could not read xlog records: %s", PQerrorMessage(streamConn)))); } if (rawlen < -1) ereport(ERROR, ! (errmsg("could not read xlog records: %s", PQerrorMessage(streamConn)))); ! if (rawlen < sizeof(XLogRecPtr)) ! ereport(ERROR, ! (errmsg("invalid WAL message received from primary"))); ! ! /* Return received WAL records to caller */ ! *recptr = *((XLogRecPtr *) recvBuf); ! *buffer = recvBuf + sizeof(XLogRecPtr); ! *len = rawlen - sizeof(XLogRecPtr); return true; } --- 297,314 ---- } PQclear(res); ereport(ERROR, ! (errmsg("could not receive data from XLOG stream: %s", PQerrorMessage(streamConn)))); } if (rawlen < -1) ereport(ERROR, ! (errmsg("could not receive data from XLOG stream: %s", PQerrorMessage(streamConn)))); ! /* Return received messages to caller */ ! *type = *((unsigned char *) recvBuf); ! *buffer = recvBuf + sizeof(*type); ! *len = rawlen - sizeof(*type); return true; } *** a/src/backend/replication/walreceiver.c --- b/src/backend/replication/walreceiver.c *************** *** 135,140 **** static void WalRcvQuickDieHandler(SIGNAL_ARGS); --- 135,141 ---- /* Prototypes for private functions */ static void WalRcvDie(int code, Datum arg); + static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len); static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr); static void XLogWalRcvFlush(void); *************** *** 258,264 **** WalReceiverMain(void) /* Loop until end-of-streaming or error */ for (;;) { ! XLogRecPtr recptr; char *buf; int len; --- 259,265 ---- /* Loop until end-of-streaming or error */ for (;;) { ! unsigned char type; char *buf; int len; *************** *** 287,303 **** WalReceiverMain(void) } /* Wait a while for data to arrive */ ! if (walrcv_receive(NAPTIME_PER_CYCLE, &recptr, &buf, &len)) { ! /* Write received WAL records to disk */ ! XLogWalRcvWrite(buf, len, recptr); ! /* Receive any more WAL records we can without sleeping */ ! while(walrcv_receive(0, &recptr, &buf, &len)) ! XLogWalRcvWrite(buf, len, recptr); /* ! * Now that we've written some records, flush them to disk and * let the startup process know about them. */ XLogWalRcvFlush(); --- 288,304 ---- } /* Wait a while for data to arrive */ ! if (walrcv_receive(NAPTIME_PER_CYCLE, &type, &buf, &len)) { ! /* Accept the received data, and process it */ ! XLogWalRcvProcessMsg(type, buf, len); ! /* Receive any more data we can without sleeping */ ! while(walrcv_receive(0, &type, &buf, &len)) ! XLogWalRcvProcessMsg(type, buf, len); /* ! * If we've written some records, flush them to disk and * let the startup process know about them. */ XLogWalRcvFlush(); *************** *** 376,381 **** WalRcvQuickDieHandler(SIGNAL_ARGS) --- 377,412 ---- } /* + * Accept the message from XLOG stream, and process it. + */ + static void + XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) + { + switch (type) + { + case 'w': /* WAL records */ + { + XLogRecPtr recptr; + + if (len < sizeof(XLogRecPtr)) + ereport(ERROR, + (errmsg("invalid WAL message received from primary"))); + + recptr = *((XLogRecPtr *) buf); + buf += sizeof(XLogRecPtr); + len -= sizeof(XLogRecPtr); + XLogWalRcvWrite(buf, len, recptr); + break; + } + default: + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid replication message type %d", + type))); + } + } + + /* * Write XLOG data to disk. */ static void *** a/src/backend/replication/walsender.c --- b/src/backend/replication/walsender.c *************** *** 659,664 **** XLogSend(StringInfo outMsg) --- 659,665 ---- * have the same byte order. If they have different byte order, we * don't reach here. */ + pq_sendbyte(outMsg, 'w'); pq_sendbytes(outMsg, (char *) &startptr, sizeof(startptr)); if (endptr.xlogid != startptr.xlogid) *** a/src/include/replication/walreceiver.h --- b/src/include/replication/walreceiver.h *************** *** 66,72 **** extern WalRcvData *WalRcv; typedef bool (*walrcv_connect_type) (char *conninfo, XLogRecPtr startpoint); extern PGDLLIMPORT walrcv_connect_type walrcv_connect; ! typedef bool (*walrcv_receive_type) (int timeout, XLogRecPtr *recptr, char **buffer, int *len); extern PGDLLIMPORT walrcv_receive_type walrcv_receive; typedef void (*walrcv_disconnect_type) (void); --- 66,73 ---- typedef bool (*walrcv_connect_type) (char *conninfo, XLogRecPtr startpoint); extern PGDLLIMPORT walrcv_connect_type walrcv_connect; ! typedef bool (*walrcv_receive_type) (int timeout, unsigned char *type, ! char **buffer, int *len); extern PGDLLIMPORT walrcv_receive_type walrcv_receive; typedef void (*walrcv_disconnect_type) (void);
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers