On Fri, 2011-01-21 at 14:45 +0200, Heikki Linnakangas wrote: > * The UI differs from what was agreed on here: > http://archives.postgresql.org/message-id/4d1dcf5a.7070...@enterprisedb.com.
Patch to add server_name parameter, plus mechanism to send info from standby to master. While doing that, refactor into 3 message types, not just 1. This addresses Fujii's comment that we may not wish to send feedback as often as other replies, but doesn't actually alter yet when the feedback is sent (nor will I do that anytime soon). Complete but rough hack, for comments, but nothing surprising. -- Simon Riggs http://www.2ndQuadrant.com/books/ PostgreSQL Development, 24x7 Support, Training and Services
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index ee09468..ff89035 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -95,6 +95,7 @@ static struct } LogstreamResult; static StandbyReplyMessage reply_message; +static StandbyHSFeedbackMessage feedback_message; /* * About SIGTERM handling: @@ -123,6 +124,8 @@ static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len); static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr); static void XLogWalRcvFlush(bool dying); static void XLogWalRcvSendReply(void); +static void XLogWalRcvSendHSFeedback(void); +static void XLogWalRcvSendInfo(void); /* Signal handlers */ static void WalRcvSigHupHandler(SIGNAL_ARGS); @@ -303,6 +306,7 @@ WalReceiverMain(void) { got_SIGHUP = false; ProcessConfigFile(PGC_SIGHUP); + XLogWalRcvSendInfo(); } /* Wait a while for data to arrive */ @@ -317,6 +321,7 @@ WalReceiverMain(void) /* Let the master know that we received some data. */ XLogWalRcvSendReply(); + XLogWalRcvSendHSFeedback(); /* * If we've written some records, flush them to disk and let the @@ -331,6 +336,7 @@ WalReceiverMain(void) * the master anyway, to report any progress in applying WAL. */ XLogWalRcvSendReply(); + XLogWalRcvSendHSFeedback(); } } } @@ -619,40 +625,84 @@ XLogWalRcvSendReply(void) reply_message.apply = GetXLogReplayRecPtr(); reply_message.sendTime = now; + elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X", + reply_message.write.xlogid, reply_message.write.xrecoff, + reply_message.flush.xlogid, reply_message.flush.xrecoff, + reply_message.apply.xlogid, reply_message.apply.xrecoff); + + /* Prepend with the message type and send it. */ + buf[0] = 'r'; + memcpy(&buf[1], &reply_message, sizeof(StandbyReplyMessage)); + walrcv_send(buf, sizeof(StandbyReplyMessage) + 1); +} + +/* + * Send hot standby feedback message to primary, plus the current time, + * in case they don't have a watch. + */ +static void +XLogWalRcvSendHSFeedback(void) +{ + char buf[sizeof(StandbyHSFeedbackMessage) + 1]; + TimestampTz now; + + /* + * If the user doesn't want status to be reported to the master, be sure + * to exit before doing anything at all. + */ + if (!hot_standby_feedback || !HotStandbyActive()) + return; + + /* Get current timestamp. */ + now = GetCurrentTimestamp(); + /* * Get the OldestXmin and its associated epoch */ - if (hot_standby_feedback && HotStandbyActive()) { TransactionId nextXid; uint32 nextEpoch; - reply_message.xmin = GetOldestXmin(true, false); + feedback_message.xmin = GetOldestXmin(true, false); /* * Get epoch and adjust if nextXid and oldestXmin are different * sides of the epoch boundary. */ GetNextXidAndEpoch(&nextXid, &nextEpoch); - if (nextXid < reply_message.xmin) + if (nextXid < feedback_message.xmin) nextEpoch--; - reply_message.epoch = nextEpoch; - } - else - { - reply_message.xmin = InvalidTransactionId; - reply_message.epoch = 0; + feedback_message.epoch = nextEpoch; } - elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X xmin %u epoch %u", - reply_message.write.xlogid, reply_message.write.xrecoff, - reply_message.flush.xlogid, reply_message.flush.xrecoff, - reply_message.apply.xlogid, reply_message.apply.xrecoff, - reply_message.xmin, - reply_message.epoch); + elog(DEBUG2, "sending xmin %u epoch %u", + feedback_message.xmin, + feedback_message.epoch); /* Prepend with the message type and send it. */ - buf[0] = 'r'; - memcpy(&buf[1], &reply_message, sizeof(StandbyReplyMessage)); - walrcv_send(buf, sizeof(StandbyReplyMessage) + 1); + buf[0] = 'h'; + memcpy(&buf[1], &feedback_message, sizeof(StandbyHSFeedbackMessage)); + walrcv_send(buf, sizeof(StandbyHSFeedbackMessage) + 1); +} + +/* + * Send info message to primary. + */ +static void +XLogWalRcvSendInfo(void) +{ + char buf[sizeof(StandbyInfoMessage) + 1]; + StandbyInfoMessage info_message; + + /* Get current timestamp. */ + info_message.sendTime = GetCurrentTimestamp(); + strncpy(info_message.servername, ServerName, strlen(ServerName)); + + elog(DEBUG2, "sending servername %s", + info_message.servername); + + /* Prepend with the message type and send it. */ + buf[0] = 'i'; + memcpy(&buf[1], &info_message, sizeof(StandbyInfoMessage)); + walrcv_send(buf, sizeof(StandbyInfoMessage) + 1); } diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index a6a7a14..e46cd01 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -116,7 +116,10 @@ static void WalSndKill(int code, Datum arg); static bool XLogSend(char *msgbuf, bool *caughtup); static void IdentifySystem(void); static void StartReplication(StartReplicationCmd * cmd); +static void ProcessStandbyMessage(void); static void ProcessStandbyReplyMessage(void); +static void ProcessStandbyHSFeedbackMessage(void); +static void ProcessStandbyInfoMessage(void); static void ProcessRepliesIfAny(void); @@ -456,42 +459,45 @@ ProcessRepliesIfAny(void) unsigned char firstchar; int r; - r = pq_getbyte_if_available(&firstchar); - if (r < 0) - { - /* unexpected error or EOF */ - ereport(COMMERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg("unexpected EOF on standby connection"))); - proc_exit(0); - } - if (r == 0) + for (;;) { - /* no data available without blocking */ - return; - } + r = pq_getbyte_if_available(&firstchar); + if (r < 0) + { + /* unexpected error or EOF */ + ereport(COMMERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("unexpected EOF on standby connection"))); + proc_exit(0); + } + if (r == 0) + { + /* no data available without blocking */ + return; + } - /* Handle the very limited subset of commands expected in this phase */ - switch (firstchar) - { - /* - * 'd' means a standby reply wrapped in a CopyData packet. - */ - case 'd': - ProcessStandbyReplyMessage(); - break; + /* Handle the very limited subset of commands expected in this phase */ + switch (firstchar) + { + /* + * 'd' means a standby reply wrapped in a CopyData packet. + */ + case 'd': + ProcessStandbyMessage(); + break; - /* - * 'X' means that the standby is closing down the socket. - */ - case 'X': - proc_exit(0); + /* + * 'X' means that the standby is closing down the socket. + */ + case 'X': + proc_exit(0); - default: - ereport(FATAL, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg("invalid standby closing message type %d", - firstchar))); + default: + ereport(FATAL, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid standby closing message type %d", + firstchar))); + } } } @@ -499,11 +505,9 @@ ProcessRepliesIfAny(void) * Process a status update message received from standby. */ static void -ProcessStandbyReplyMessage(void) +ProcessStandbyMessage(void) { - StandbyReplyMessage reply; char msgtype; - TransactionId newxmin = InvalidTransactionId; resetStringInfo(&reply_message); @@ -523,22 +527,43 @@ ProcessStandbyReplyMessage(void) * one type. */ msgtype = pq_getmsgbyte(&reply_message); - if (msgtype != 'r') + + switch (msgtype) { - ereport(COMMERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg("unexpected message type %c", msgtype))); - proc_exit(0); + case 'r': + ProcessStandbyReplyMessage(); + break; + + case 'h': + ProcessStandbyHSFeedbackMessage(); + break; + + case 'i': + ProcessStandbyInfoMessage(); + break; + + default: + ereport(COMMERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("unexpected message type %c", msgtype))); + proc_exit(0); } +} + +/* + * Regular reply from standby advising of WAL positions on standby server. + */ +static void +ProcessStandbyReplyMessage(void) +{ + StandbyReplyMessage reply; pq_copymsgbytes(&reply_message, (char *) &reply, sizeof(StandbyReplyMessage)); - elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X xmin %u epoch %u", + elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X", reply.write.xlogid, reply.write.xrecoff, reply.flush.xlogid, reply.flush.xrecoff, - reply.apply.xlogid, reply.apply.xrecoff, - reply.xmin, - reply.epoch); + reply.apply.xlogid, reply.apply.xrecoff); /* * Update shared state for this WalSender process @@ -554,6 +579,22 @@ ProcessStandbyReplyMessage(void) walsnd->apply = reply.apply; SpinLockRelease(&walsnd->mutex); } +} + +/* + * Hot Standby feedback + */ +static void +ProcessStandbyHSFeedbackMessage(void) +{ + StandbyHSFeedbackMessage reply; + TransactionId newxmin = InvalidTransactionId; + + pq_copymsgbytes(&reply_message, (char *) &reply, sizeof(StandbyHSFeedbackMessage)); + + elog(DEBUG2, "xmin %u epoch %u", + reply.xmin, + reply.epoch); /* * Update the WalSender's proc xmin to allow it to be visible @@ -619,6 +660,16 @@ ProcessStandbyReplyMessage(void) } } +static void +ProcessStandbyInfoMessage(void) +{ + StandbyInfoMessage info; + + pq_copymsgbytes(&reply_message, (char *) &info, sizeof(StandbyInfoMessage)); + + elog(DEBUG2, "server name %s", info.servername); +} + /* Main loop of walsender process */ static int WalSndLoop(void) diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 55cbf75..de6de82 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -405,6 +405,9 @@ int tcp_keepalives_idle; int tcp_keepalives_interval; int tcp_keepalives_count; +char *ServerName = NULL; + + /* * These variables are all dummies that don't do anything, except in some * cases provide the value for SHOW to display. The real state is elsewhere @@ -2365,6 +2368,15 @@ static struct config_string ConfigureNamesString[] = }, { + {"server_name", PGC_POSTMASTER, CLIENT_CONN_STATEMENT, + gettext_noop("Allows setting of a unique name for this server."), + NULL + }, + &ServerName, + "", NULL, NULL + }, + + { {"temp_tablespaces", PGC_USERSET, CLIENT_CONN_STATEMENT, gettext_noop("Sets the tablespace(s) to use for temporary tables and sort files."), NULL, diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 6726733..cbe6fb2 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -56,6 +56,7 @@ # - Connection Settings - +#server_name = '' # optional server name for use when clustering servers #listen_addresses = 'localhost' # what IP address(es) to listen on; # comma-separated list of addresses; # defaults to 'localhost', '*' = all diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h index aa8cce5..bf6c262 100644 --- a/src/include/miscadmin.h +++ b/src/include/miscadmin.h @@ -218,6 +218,7 @@ extern int CTimeZone; #define MAXTZLEN 10 /* max TZ name len, not counting tr. null */ +extern char *ServerName; extern bool enableFsync; extern bool allowSystemTableMods; extern PGDLLIMPORT int work_mem; diff --git a/src/include/replication/walprotocol.h b/src/include/replication/walprotocol.h index da94b6b..48cb333 100644 --- a/src/include/replication/walprotocol.h +++ b/src/include/replication/walprotocol.h @@ -56,6 +56,18 @@ typedef struct XLogRecPtr flush; XLogRecPtr apply; + /* Sender's system clock at the time of transmission */ + TimestampTz sendTime; +} StandbyReplyMessage; + +/* + * Hot Standby feedback from standby (message type 'h'). This is wrapped within + * a CopyData message at the FE/BE protocol level. + * + * Note that the data length is not specified here. + */ +typedef struct +{ /* * The current xmin and epoch from the standby, for Hot Standby feedback. * This may be invalid if the standby-side does not support feedback, @@ -64,10 +76,23 @@ typedef struct TransactionId xmin; uint32 epoch; + /* Sender's system clock at the time of transmission */ + TimestampTz sendTime; +} StandbyHSFeedbackMessage; + +/* + * Info message from standby (message type 'i'). This is wrapped within + * a CopyData message at the FE/BE protocol level. + * + * Note that the data length is not specified here. + */ +typedef struct +{ + char servername[64]; /* Sender's system clock at the time of transmission */ TimestampTz sendTime; -} StandbyReplyMessage; +} StandbyInfoMessage; /* * Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ.
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers