Having gone through the patch now in more detail, I think it's in pretty good shape. I'm happy with the overall design, except that I haven't been able to make up my mind if walreceiver should indeed be a stand-alone program as discussed, or a postmaster child process as in the patch you submitted. Putting that question aside for a moment, here's some minor things, in no particular order:
- The async API in PQgetXLogData is quite different from the other commands. It's close to the API from PQgetCopyData(), but doesn't return a malloc'd buffer like PQgetCopyData does. I presume that's to optimize away the extra memcpy step? I don't think that's really necessary, I don't recall any complaints about that in PQgetCopyData(), and if it does become an issue, it could be optimized away by mallocing the buffer first and reading directly to that. - Can we avoid sprinkling XLogStreamingAllowed() calls to places where we check if WAL-logging is required (nbtsort.c, copy.c etc.). I think we need a new macro to encapsulate (XLogArchivingActive() || XLogStreamingAllowed()). - Is O_DIRECT ever a good idea in walreceiver? If it's really direct and doesn't get cached, the startup process will need to read from disk. - Can we replace read/write_conninfo with just a long-enough field in shared mem? Would be simpler. (this is moot if we go with the stand-alone walreceiver program and pass it as a command-line argument) - walreceiver shouldn't die on connection error, just to be restarted by startup process. Can we add error handling a la bgwriter and have a retry loop within walreceiver? (again, if we go with a stand-alone walreceiver program, it's probably better to have startup process responsible to restart walreceiver, as it is now) - pq_wait in backend waits until you can read or write at least 1 byte. There is no guarantee that you can send or read the whole message without blocking. We'd have to put the socket in non-blocking mode for that. I'm not sure what the implications of this are. - we should include system_identifier somewhere in the replication startup handshake. Otherwise you can connect to server from a different system and have logs shipped, if they happen to be roughly at the same point in WAL. Replay will almost certainly fail, but we should error earlier. - I know I said we should have just asynchronous replication at first, but looking ahead, how would you do synchronous? What kind of signaling is needed between walreceiver and startup process for that? - 'replication' shouldn't be a real database. I found the paging logic in walsender confusing, and didn't like the idea that walsender needs to set the XLOGSTREAM_END_SEG flag. Surely walreceiver knows how to split the WAL into files without such a flag. I reworked that logic, I think it's easier to understand now. I kept the support for the flag in libpq and the protocol for now, but it should be removed too, or repurposed to indicate that pg_switch_xlog() was done in the master. I've pushed that to 'replication-orig' branch in my git repository, attached is the same as a diff against your SR_0914.patch. I need a break from this patch, so I'll take a closer look at Simon's hot standby now. Meanwhile, can you work on the above items and submit a new version, please? -- Heikki Linnakangas EnterpriseDB http://www.enterprisedb.com
*** a/src/backend/access/transam/recovery.conf.sample --- b/src/backend/access/transam/recovery.conf.sample *************** *** 2,10 **** # PostgreSQL recovery config file # ------------------------------- # ! # Edit this file to provide the parameters that PostgreSQL ! # needs to perform an archive recovery of a database, or ! # a log-streaming replication. # # If "recovery.conf" is present in the PostgreSQL data directory, it is # read on postmaster startup. After successful recovery, it is renamed --- 2,10 ---- # PostgreSQL recovery config file # ------------------------------- # ! # Edit this file to provide the parameters that PostgreSQL needs to ! # perform an archive recovery of a database, or to act as a log-streaming ! # replication standby. # # If "recovery.conf" is present in the PostgreSQL data directory, it is # read on postmaster startup. After successful recovery, it is renamed *************** *** 83,89 **** #--------------------------------------------------------------------------- # # When standby_mode is enabled, the PostgreSQL server will work as ! # the standby. It tries to connect to the primary according to the # connection settings primary_conninfo, and receives XLOG records # continuously. # --- 83,89 ---- #--------------------------------------------------------------------------- # # When standby_mode is enabled, the PostgreSQL server will work as ! # a standby. It tries to connect to the primary according to the # connection settings primary_conninfo, and receives XLOG records # continuously. # *** a/src/backend/access/transam/xlog.c --- b/src/backend/access/transam/xlog.c *************** *** 2645,2653 **** XLogFileClose(void) * WAL segment files will not be re-read in normal operation, so we advise * the OS to release any cached pages. But do not do so if WAL archiving * or streaming is active, because archiver and walsender process could use ! * the cache to read the WAL segment, respectively. Also, don't bother ! * with it if we are using O_DIRECT, since the kernel is presumably not ! * caching in that case. */ #if defined(USE_POSIX_FADVISE) && defined(POSIX_FADV_DONTNEED) if (!XLogArchivingActive() && !WalSndInProgress() && --- 2645,2653 ---- * WAL segment files will not be re-read in normal operation, so we advise * the OS to release any cached pages. But do not do so if WAL archiving * or streaming is active, because archiver and walsender process could use ! * the cache to read the WAL segment. Also, don't bother with it if we ! * are using O_DIRECT, since the kernel is presumably not caching in that ! * case. */ #if defined(USE_POSIX_FADVISE) && defined(POSIX_FADV_DONTNEED) if (!XLogArchivingActive() && !WalSndInProgress() && *************** *** 3481,3487 **** FetchRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt) startlsn.xlogid, startlsn.xrecoff))); } ! return ReadRecord(RecPtr, emode); } /* --- 3481,3487 ---- startlsn.xlogid, startlsn.xrecoff))); } ! return ReadRecord(RecPtr, emode); } /* *************** *** 5284,5290 **** exitStreamingRecovery(void) */ ShutdownWalRcv(); ! /* We are no longer in streaming recovery state */ InStreamingRecovery = false; ereport(LOG, --- 5284,5290 ---- */ ShutdownWalRcv(); ! /* We are no longer in streaming recovery state */ InStreamingRecovery = false; ereport(LOG, *** a/src/backend/postmaster/postmaster.c --- b/src/backend/postmaster/postmaster.c *************** *** 289,295 **** typedef enum PM_WAIT_BACKENDS, /* waiting for live backends to exit */ PM_SHUTDOWN, /* waiting for bgwriter to do shutdown ckpt */ PM_SHUTDOWN_2, /* waiting for archiver to finish */ - PM_SHUTDOWN_3, /* waiting for walsenders to finish */ PM_WAIT_DEAD_END, /* waiting for dead_end children to exit */ PM_NO_CHILDREN /* all important children have exited */ } PMState; --- 289,294 ---- *************** *** 1640,1646 **** retry1: if (proto == XLOG_STREAMING_CODE && !am_walsender) { am_walsender = true; ! /* No packets other than regular one should not follow */ return ProcessStartupPacket(port, SSLdone); } --- 1639,1645 ---- if (proto == XLOG_STREAMING_CODE && !am_walsender) { am_walsender = true; ! /* No packets other than regular one should follow */ return ProcessStartupPacket(port, SSLdone); } *************** *** 2404,2420 **** reaper(SIGNAL_ARGS) */ Assert(Shutdown > NoShutdown); ! if (PgArchPID != 0) { /* Waken archiver for the last time */ ! signal_child(PgArchPID, SIGUSR2); ! pmState = PM_SHUTDOWN_2; ! } ! else if (WalSndInProgress()) ! { /* Waken walsenders for the last time */ SignalWalSenders(SIGUSR2); ! pmState = PM_SHUTDOWN_3; } else pmState = PM_WAIT_DEAD_END; --- 2403,2418 ---- */ Assert(Shutdown > NoShutdown); ! if (PgArchPID != 0 || WalSndInProgress()) { /* Waken archiver for the last time */ ! if (PgArchPID != 0) ! signal_child(PgArchPID, SIGUSR2); ! /* Waken walsenders for the last time */ SignalWalSenders(SIGUSR2); ! ! pmState = PM_SHUTDOWN_2; } else pmState = PM_WAIT_DEAD_END; *************** *** 2499,2510 **** reaper(SIGNAL_ARGS) ((pmState == PM_RECOVERY || pmState == PM_RECOVERY_CONSISTENT) && WalRcvInProgress()))) PgArchPID = pgarch_start(); ! else if (pmState == PM_SHUTDOWN_2 && WalSndInProgress()) ! { ! SignalWalSenders(SIGUSR2); ! pmState = PM_SHUTDOWN_3; ! } ! else pmState = PM_WAIT_DEAD_END; continue; } --- 2497,2503 ---- ((pmState == PM_RECOVERY || pmState == PM_RECOVERY_CONSISTENT) && WalRcvInProgress()))) PgArchPID = pgarch_start(); ! else if (pmState == PM_SHUTDOWN_2 && !WalSndInProgress()) pmState = PM_WAIT_DEAD_END; continue; } *************** *** 2611,2618 **** CleanupBackend(int pid, * advance to the next shutdown step. */ if (bp->child_type == BACKEND_TYPE_WALSND && ! pmState == PM_SHUTDOWN_3 && ! !WalSndInProgress()) pmState = PM_WAIT_DEAD_END; } DLRemove(curr); --- 2604,2611 ---- * advance to the next shutdown step. */ if (bp->child_type == BACKEND_TYPE_WALSND && ! pmState == PM_SHUTDOWN_2 && ! !WalSndInProgress() && PgArchPID == 0) pmState = PM_WAIT_DEAD_END; } DLRemove(curr); *** a/src/backend/postmaster/walreceiver.c --- b/src/backend/postmaster/walreceiver.c *************** *** 100,108 **** static void WalRcvQuickDieHandler(SIGNAL_ARGS); static void WalRcvLoop(void); static void InitWalRcv(void); static void WalRcvKill(int code, Datum arg); ! static void XLogWalRcvWrite(char *buf, Size len, XLogRecPtr recptr, bool *fsynced); ! static void XLogWalRcvFlush(XLogRecPtr recptr); ! static void WritePhysicalXLog(char *from, Size nbytes, int startoff); static char *read_conninfo_file(void); /* Main entry point for walreceiver process */ --- 100,107 ---- static void WalRcvLoop(void); static void InitWalRcv(void); static void WalRcvKill(int code, Datum arg); ! static void XLogWalRcvWrite(char *buf, Size len, XLogRecPtr recptr); ! static void XLogWalRcvFlush(void); static char *read_conninfo_file(void); /* Main entry point for walreceiver process */ *************** *** 228,235 **** WalRcvLoop(void) /* Loop until end-of-streaming or error */ for (;;) { - bool fsynced = false; - /* * Emergency bailout if postmaster has died. This is to avoid the * necessity for manual cleanup of all postmaster children. --- 227,232 ---- *************** *** 298,304 **** WalRcvLoop(void) * can recover all transactions from the primary). */ ! XLogWalRcvWrite(buf, len, recptr, &fsynced); /* * The logs in the XLogData message were written successfully, --- 295,301 ---- * can recover all transactions from the primary). */ ! XLogWalRcvWrite(buf, len, recptr); /* * The logs in the XLogData message were written successfully, *************** *** 307,357 **** WalRcvLoop(void) PQmarkConsumed(streamConn); /* ! * If fsync is not requested or was already done, we send a "success" ! * to the primary before issuing fsync for end-of-segment. */ ! if (fsynced || !fsync_requested) ! { ! if (PQputXLogRecPtr(streamConn, recptr.xlogid, recptr.xrecoff, ! (int) fsynced) == -1) ! ereport(FATAL, ! (errmsg("could not send a message to the primary: %s", ! PQerrorMessage(streamConn)))); ! } ! ! /* ! * If we just wrote the whole last page of a logfile segment but ! * had not fsynced it yet, fsync the segment immediately. This ! * avoids having to go back and re-open prior segments when an ! * fsync request comes along later. ! * ! * Of course, if asked to fsync but not, do so. ! */ ! if (!fsynced && (fsync_requested || finishing_seg)) ! { ! XLogWalRcvFlush(recptr); ! ! if (PQputXLogRecPtr(streamConn, recptr.xlogid, recptr.xrecoff, ! 1) == -1) ! ereport(FATAL, ! (errmsg("could not send a message to the primary: %s", ! PQerrorMessage(streamConn)))); ! ! /* ! * If the segment is ready to copy to archival storage, ! * notify the archiver so. ! */ ! if (finishing_seg && XLogArchivingActive()) ! XLogArchiveNotifySeg(recvId, recvSeg); ! ! /* ! * XXX: Should we signal bgwriter to start a restartpoint ! * if we've consumed too much xlog since the last one, like ! * in normal processing? But this is not worth doing unless ! * a restartpoint can be created independently from a ! * checkpoint record. ! */ ! } } if (len == -1) /* end-of-streaming */ --- 304,314 ---- PQmarkConsumed(streamConn); /* ! * If the primary requested us to fsync, do so now and send ! * and acknowledgement. */ ! if (fsync_requested) ! XLogWalRcvFlush(); } if (len == -1) /* end-of-streaming */ *************** *** 511,589 **** WalRcvInProgress(void) * fsynced is set to true if the log was fsyned by O_DIRECT. */ static void ! XLogWalRcvWrite(char *buf, Size len, XLogRecPtr recptr, bool *fsynced) { int startoff; ! int endoff; ! START_CRIT_SECTION(); ! if (!XLByteInPrevSeg(recptr, recvId, recvSeg)) { ! bool use_existent; ! /* ! * XLOG segment files will be re-read in recovery operation soon, ! * so we don't need to advise the OS to release any cache page. ! */ ! if (recvFile >= 0 && close(recvFile)) ereport(PANIC, (errcode_for_file_access(), ! errmsg("could not close log file %u, segment %u: %m", ! recvId, recvSeg))); ! recvFile = -1; ! ! /* Create/use new log file */ ! XLByteToPrevSeg(recptr, recvId, recvSeg); ! use_existent = true; ! recvFile = XLogFileInit(recvId, recvSeg, ! &use_existent, true); ! recvOff = 0; ! } ! /* Make sure we have the current logfile open */ ! if (recvFile < 0) ! { ! XLByteToPrevSeg(recptr, recvId, recvSeg); ! recvFile = XLogFileOpen(recvId, recvSeg); ! recvOff = 0; ! } ! /* Calculate the start/end file offset of the received logs */ ! endoff = recptr.xrecoff % XLogSegSize; ! startoff = ((endoff == 0) ? XLogSegSize : endoff) - len; /* ! * Re-zero the page so that bytes beyond what we've written will look ! * like zeroes and not valid XLOG records. Only end page which we are ! * writing need to be zeroed. Of course, we can skip zeroing the pages ! * full of the XLOG records. Save the end position of the already zeroed ! * area at the variable ZeroedRecPtr, and avoid zeroing the same page ! * two or more times. * * This must precede the writing of the actual logs. Otherwise, a crash ! * before re-zeroing would cause a corrupted page. */ ! if (XLByteLT(ZeroedRecPtr, recptr) && endoff % XLOG_BLCKSZ != 0) { int zlen; ! zlen = XLOG_BLCKSZ - endoff % XLOG_BLCKSZ; ! WritePhysicalXLog(ZeroedBuffer, zlen, endoff); ZeroedRecPtr = recptr; ZeroedRecPtr.xrecoff += zlen; - } ! /* Write out the logs */ ! WritePhysicalXLog(buf, len, startoff); ! LogstreamResult.Send = recptr; ! LogstreamResult.Write = recptr; ! ! if (sync_method == SYNC_METHOD_OPEN || ! sync_method == SYNC_METHOD_OPEN_DSYNC) ! { ! LogstreamResult.Flush = recptr; ! *fsynced = true; /* logs were already fsynced */ } /* Update shared-memory status */ --- 468,623 ---- * fsynced is set to true if the log was fsyned by O_DIRECT. */ static void ! XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) { int startoff; ! int byteswritten; ! START_CRIT_SECTION(); /* XXX: Why? */ ! while (nbytes > 0) { ! int segbytes; ! uint32 tmp; ! if (recvFile < 0 || !XLByteInSeg(recptr, recvId, recvSeg)) ! { ! bool use_existent; ! ! /* ! * XLOG segment files will be re-read in recovery operation soon, ! * so we don't need to advise the OS to release any cache page. ! */ ! if (recvFile >= 0) ! { ! /* ! * fsync() before we switch to next file. We would otherwise ! * have to reopen this file to fsync it later ! */ ! XLogWalRcvFlush(); ! if (close(recvFile) != 0) ! ereport(PANIC, ! (errcode_for_file_access(), ! errmsg("could not close log file %u, segment %u: %m", ! recvId, recvSeg))); ! } ! recvFile = -1; ! ! /* Create/use new log file */ ! XLByteToSeg(recptr, recvId, recvSeg); ! use_existent = true; ! recvFile = XLogFileInit(recvId, recvSeg, ! &use_existent, true); ! recvOff = 0; ! } ! ! /* Calculate the start offset of the received logs */ ! startoff = recptr.xrecoff % XLogSegSize; ! ! if (startoff + nbytes > XLOG_SEG_SIZE) ! segbytes = XLOG_SEG_SIZE - startoff; ! else ! segbytes = nbytes; ! ! /* Need to seek in the file? */ ! if (recvOff != startoff) ! { ! if (lseek(recvFile, (off_t) startoff, SEEK_SET) < 0) ! ereport(PANIC, ! (errcode_for_file_access(), ! errmsg("could not seek in log file %u, " ! "segment %u to offset %u: %m", ! recvId, recvSeg, startoff))); ! recvOff = startoff; ! } ! ! /* OK to write the logs */ ! errno = 0; ! ! byteswritten = write(recvFile, buf, segbytes); ! if (byteswritten <= 0) ! { ! /* if write didn't set errno, assume no disk space */ ! if (errno == 0) ! errno = ENOSPC; ereport(PANIC, (errcode_for_file_access(), ! errmsg("could not write to log file %u, segment %u " ! "at offset %u, length %lu: %m", ! recvId, recvSeg, ! recvOff, (unsigned long) segbytes))); ! } ! /* Update state for read */ ! tmp = recptr.xrecoff + byteswritten; ! if (tmp < recptr.xrecoff) ! recptr.xlogid++; /* overflow */ ! recptr.xrecoff = tmp; ! recvOff += byteswritten; ! nbytes -= byteswritten; ! buf += byteswritten; ! ! LogstreamResult.Send = recptr; ! LogstreamResult.Write = recptr; ! ! if (sync_method == SYNC_METHOD_OPEN || ! sync_method == SYNC_METHOD_OPEN_DSYNC) ! { ! LogstreamResult.Flush = recptr; ! } ! ! /* ! * If the segment is ready to copy to archival storage, ! * notify the archiver so. ! */ ! if ((recptr.xrecoff % XLOG_SEG_SIZE == 0) && XLogArchivingActive()) ! XLogArchiveNotifySeg(recvId, recvSeg); ! ! /* ! * XXX: Should we signal bgwriter to start a restartpoint ! * if we've consumed too much xlog since the last one, like ! * in normal processing? But this is not worth doing unless ! * a restartpoint can be created independently from a ! * checkpoint record. ! */ ! } /* ! * Zero the rest of the last page we wrote to, so that bytes beyond what ! * we've written will look like zeroes and not valid XLOG records. Save ! * the end position of the already zeroed area at the variable ! * ZeroedRecPtr, and avoid zeroing the same page two or more times. * * This must precede the writing of the actual logs. Otherwise, a crash ! * before re-zeroing would cause a corrupted page. XXX: that's not really ! * an issue, a hard crash could leave the page half-flushed anyway. And we ! * have CRC to protect from that anyway, this zeroing business isn't ! * absolutely necessary anyway. */ ! if (XLByteLT(ZeroedRecPtr, recptr) && recptr.xrecoff % XLOG_BLCKSZ != 0) { int zlen; ! zlen = XLOG_BLCKSZ - recptr.xrecoff % XLOG_BLCKSZ; ! ! byteswritten = write(recvFile, ZeroedBuffer, zlen); ! if (byteswritten != zlen) ! { ! /* if write didn't set errno, assume no disk space */ ! if (errno == 0) ! errno = ENOSPC; ! ereport(PANIC, ! (errcode_for_file_access(), ! errmsg("could not write to log file %u, segment %u " ! "at offset %u, length %lu: %m", ! recvId, recvSeg, ! recvOff, (unsigned long) nbytes))); ! } ZeroedRecPtr = recptr; ZeroedRecPtr.xrecoff += zlen; ! recvOff += byteswritten; } /* Update shared-memory status */ *************** *** 594,600 **** XLogWalRcvWrite(char *buf, Size len, XLogRecPtr recptr, bool *fsynced) SpinLockAcquire(&walrcv->mutex); XLByteUpdate(LogstreamResult.Send, walrcv->LogstreamResult.Send); XLByteUpdate(LogstreamResult.Write, walrcv->LogstreamResult.Write); ! if (*fsynced) XLByteUpdate(LogstreamResult.Flush, walrcv->LogstreamResult.Flush); SpinLockRelease(&walrcv->mutex); } --- 628,635 ---- SpinLockAcquire(&walrcv->mutex); XLByteUpdate(LogstreamResult.Send, walrcv->LogstreamResult.Send); XLByteUpdate(LogstreamResult.Write, walrcv->LogstreamResult.Write); ! if (sync_method == SYNC_METHOD_OPEN || ! sync_method == SYNC_METHOD_OPEN_DSYNC) XLByteUpdate(LogstreamResult.Flush, walrcv->LogstreamResult.Flush); SpinLockRelease(&walrcv->mutex); } *************** *** 607,666 **** XLogWalRcvWrite(char *buf, Size len, XLogRecPtr recptr, bool *fsynced) /* Flush the log to disk */ static void ! XLogWalRcvFlush(XLogRecPtr recptr) { ! START_CRIT_SECTION(); ! ! issue_xlog_fsync(recvFile, recvId, recvSeg); ! ! LogstreamResult.Flush = recptr; ! ! /* Update shared-memory status */ { /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = WalRcv; SpinLockAcquire(&walrcv->mutex); XLByteUpdate(LogstreamResult.Flush, walrcv->LogstreamResult.Flush); SpinLockRelease(&walrcv->mutex); - } - - END_CRIT_SECTION(); - } ! /* Physical write to the given logs */ ! static void ! WritePhysicalXLog(char *from, Size nbytes, int startoff) ! { ! /* Need to seek in the file? */ ! if (recvOff != startoff) ! { ! if (lseek(recvFile, (off_t) startoff, SEEK_SET) < 0) ! ereport(PANIC, ! (errcode_for_file_access(), ! errmsg("could not seek in log file %u, " ! "segment %u to offset %u: %m", ! recvId, recvSeg, startoff))); ! recvOff = startoff; ! } ! /* OK to write the logs */ ! errno = 0; ! if (write(recvFile, from, nbytes) != nbytes) ! { ! /* if write didn't set errno, assume no disk space */ ! if (errno == 0) ! errno = ENOSPC; ! ereport(PANIC, ! (errcode_for_file_access(), ! errmsg("could not write to log file %u, segment %u " ! "at offset %u, length %lu: %m", ! recvId, recvSeg, ! recvOff, (unsigned long) nbytes))); } - - /* Update state for write */ - recvOff += nbytes; } /* --- 642,674 ---- /* Flush the log to disk */ static void ! XLogWalRcvFlush(void) { ! if (XLByteLT(LogstreamResult.Flush, LogstreamResult.Write)) { /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = WalRcv; + START_CRIT_SECTION(); + + issue_xlog_fsync(recvFile, recvId, recvSeg); + + LogstreamResult.Flush = LogstreamResult.Write; + + /* Update shared-memory status */ SpinLockAcquire(&walrcv->mutex); XLByteUpdate(LogstreamResult.Flush, walrcv->LogstreamResult.Flush); SpinLockRelease(&walrcv->mutex); ! END_CRIT_SECTION(); ! /* Let the primary know */ ! if (PQputXLogRecPtr(streamConn, LogstreamResult.Flush.xlogid, ! LogstreamResult.Flush.xrecoff, 1) == -1) ! ereport(FATAL, ! (errmsg("could not send a message to the primary: %s", ! PQerrorMessage(streamConn)))); } } /* *** a/src/backend/postmaster/walsender.c --- b/src/backend/postmaster/walsender.c *************** *** 113,122 **** static void WalSndQuickDieHandler(SIGNAL_ARGS); static int WalSndLoop(void); static void InitWalSnd(void); static void WalSndKill(int code, Datum arg); ! static void XLogRead(char *buf, uint32 startoff, Size nbytes, XLogRecPtr recptr); static bool XLogSend(PendingMessage inMsg, PendingMessage outMsg); static bool ProcessStreamMsgs(PendingMessage inMsg); /* Main entry point for walsender process */ int WalSenderMain(void) --- 113,127 ---- static int WalSndLoop(void); static void InitWalSnd(void); static void WalSndKill(int code, Datum arg); ! static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes); static bool XLogSend(PendingMessage inMsg, PendingMessage outMsg); static bool ProcessStreamMsgs(PendingMessage inMsg); + /* + * How much WAL to send in one message? Must be >= XLOG_BLCKSZ. + */ + #define MAX_SEND_SIZE (XLOG_SEG_SIZE / 2) + /* Main entry point for walsender process */ int WalSenderMain(void) *************** *** 382,400 **** WalSndKill(int code, Datum arg) } /* ! * Read the log into buffer. ! * ! * startoff is the file offset where we start reading the log from; nbytes is ! * the number of bytes which needs to be read; recptr is the last byte + 1 to ! * read. */ void ! XLogRead(char *buf, uint32 startoff, Size nbytes, XLogRecPtr recptr) { char path[MAXPGPATH]; ! ! /* Don't cross a segment boundary */ ! Assert(startoff + nbytes <= XLogSegSize); #ifdef REPLICATION_DEBUG if (REPLICATION_DEBUG_ENABLED) --- 387,399 ---- } /* ! * Read 'nbytes' bytes from WAL into 'buf', starting at location 'recptr' */ void ! XLogRead(char *buf, XLogRecPtr recptr, Size nbytes) { char path[MAXPGPATH]; ! uint32 startoff; #ifdef REPLICATION_DEBUG if (REPLICATION_DEBUG_ENABLED) *************** *** 404,464 **** XLogRead(char *buf, uint32 startoff, Size nbytes, XLogRecPtr recptr) LogstreamResult.Write.xlogid, LogstreamResult.Write.xrecoff); #endif ! if (!XLByteInPrevSeg(recptr, sendId, sendSeg)) { ! /* Switch to another logfile segment */ ! if (sendFile >= 0) ! close(sendFile); ! XLByteToPrevSeg(recptr, sendId, sendSeg); ! XLogFilePath(path, ThisTimeLineID, sendId, sendSeg); ! sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0); ! if (sendFile < 0) ! ereport(FATAL, ! (errcode_for_file_access(), ! errmsg("could not open file \"%s\" (log file %u, segment %u): %m", ! path, sendId, sendSeg))); ! sendOff = 0; ! } ! /* Make sure we have the current logfile open */ ! if (sendFile < 0) ! { ! XLByteToPrevSeg(recptr, sendId, sendSeg); ! XLogFilePath(path, ThisTimeLineID, sendId, sendSeg); ! sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0); ! if (sendFile < 0) ! ereport(FATAL, ! (errcode_for_file_access(), ! errmsg("could not open file \"%s\" (log file %u, segment %u): %m", ! path, sendId, sendSeg))); ! sendOff = 0; ! } ! /* Need to seek in the file? */ ! if (sendOff != startoff) ! { ! if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0) ereport(FATAL, (errcode_for_file_access(), ! errmsg("could not seek in log file %u, segment %u to offset %u: %m", ! sendId, sendSeg, startoff))); ! sendOff = startoff; ! } ! ! if (read(sendFile, buf, nbytes) != nbytes) ! { ! ereport(FATAL, ! (errcode_for_file_access(), ! errmsg("could not read from log file %u, segment %u, offset %u, " ! "length %lu: %m", ! sendId, sendSeg, sendOff, (unsigned long) nbytes))); } - - /* Update state for read */ - sendOff += nbytes; } /* --- 403,469 ---- LogstreamResult.Write.xlogid, LogstreamResult.Write.xrecoff); #endif ! while (nbytes > 0) { ! int segbytes; ! int readbytes; ! uint32 tmp; ! startoff = recptr.xrecoff % XLOG_SEG_SIZE; ! if (sendFile < 0 || !XLByteInSeg(recptr, sendId, sendSeg)) ! { ! /* Switch to another logfile segment */ ! if (sendFile >= 0) ! close(sendFile); ! ! XLByteToSeg(recptr, sendId, sendSeg); ! XLogFilePath(path, ThisTimeLineID, sendId, sendSeg); ! ! sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0); ! if (sendFile < 0) ! ereport(FATAL, /* XXX: Why FATAL? */ ! (errcode_for_file_access(), ! errmsg("could not open file \"%s\" (log file %u, segment %u): %m", ! path, sendId, sendSeg))); ! sendOff = 0; ! } ! /* Need to seek in the file? */ ! if (sendOff != startoff) ! { ! if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0) ! ereport(FATAL, ! (errcode_for_file_access(), ! errmsg("could not seek in log file %u, segment %u to offset %u: %m", ! sendId, sendSeg, startoff))); ! sendOff = startoff; ! } ! /* How many bytes are within this segment? */ ! if (nbytes > (XLOG_SEG_SIZE - startoff)) ! segbytes = XLOG_SEG_SIZE - startoff; ! else ! segbytes = nbytes; ! readbytes = read(sendFile, buf, segbytes); ! if (readbytes <= 0) ereport(FATAL, (errcode_for_file_access(), ! errmsg("could not read from log file %u, segment %u, offset %u, " ! "length %lu: %m", ! sendId, sendSeg, sendOff, (unsigned long) segbytes))); ! ! /* Update state for read */ ! tmp = recptr.xrecoff + readbytes; ! if (tmp < recptr.xrecoff) ! recptr.xlogid++; /* overflow */ ! recptr.xrecoff = tmp; ! ! sendOff += readbytes; ! nbytes -= readbytes; ! buf += readbytes; } } /* *************** *** 469,488 **** XLogRead(char *buf, uint32 startoff, Size nbytes, XLogRecPtr recptr) static bool XLogSend(PendingMessage inMsg, PendingMessage outMsg) { - bool ispartialpage; - bool last_iteration; - bool finishing_seg; - int nmsgs; - int npages; int res; - uint32 startpos; - uint32 startoff; - uint32 endpos; XLogRecPtr SendRqstPtr; /* ! * Invalid position means that XLOG streaming is not started yet, ! * so we do nothing here. */ if (XLogRecPtrIsInvalid(LogstreamResult.Send)) return true; --- 474,486 ---- static bool XLogSend(PendingMessage inMsg, PendingMessage outMsg) { int res; XLogRecPtr SendRqstPtr; /* ! * Invalid position means that we have not yet received the initial ! * XLogRecPtr message from the slave that indicates where to start the ! * streaming. */ if (XLogRecPtrIsInvalid(LogstreamResult.Send)) return true; *************** *** 490,495 **** XLogSend(PendingMessage inMsg, PendingMessage outMsg) --- 488,497 ---- /* Attempt to send all the records which were written to the disk */ SendRqstPtr = GetWriteRecPtr(); + /* Quick exit if nothing to do */ + if (!XLByteLT(LogstreamResult.Send, SendRqstPtr)) + return true; + #ifdef REPLICATION_DEBUG if (REPLICATION_DEBUG_ENABLED) elog(LOG, "xlog send request %X/%X; send %X/%X; write %X/%X", *************** *** 520,631 **** XLogSend(PendingMessage inMsg, PendingMessage outMsg) * sending in the last page. We must initialize all of them to * keep the compiler quiet. */ - nmsgs = 0; - npages = 0; - startpos = 0; - startoff = 0; - endpos = XLOG_BLCKSZ; while (XLByteLT(LogstreamResult.Send, SendRqstPtr)) { /* ! * Advance LogstreamResult.Send to end of current page. If this ! * is a first loop iteration (i.e., in the case where npages is 0), ! * it might indicate a halfway position or cross a logid boundary, ! * so alignment is needed. Otherwise, since it's guaranteed that ! * LogstreamResult.Send indicates end of previous page and we have ! * not crossed a logid boundary yet in this loop iteration, ! * we have only to increment it by XLOG_BLCKSZ bytes. */ ! if (npages == 0) ! { ! startpos = LogstreamResult.Send.xrecoff % XLOG_BLCKSZ; ! startoff = LogstreamResult.Send.xrecoff % XLogSegSize - startpos; ! LogstreamResult.Send.xrecoff += XLOG_BLCKSZ - startpos; ! if (LogstreamResult.Send.xrecoff > XLogFileSize) ! { ! LogstreamResult.Send.xlogid++; ! LogstreamResult.Send.xrecoff %= XLogFileSize; ! } ! } ! else ! LogstreamResult.Send.xrecoff += XLOG_BLCKSZ; ! ispartialpage = XLByteLT(SendRqstPtr, LogstreamResult.Send); ! npages++; /* ! * Read and send the set if this will be the last loop iteration, ! * or if the number of pages in the set is larger than ! * MaxPagesPerXLogData, or if we are at the end of the logfile ! * segment. */ - last_iteration = !XLByteLT(LogstreamResult.Send, SendRqstPtr); - if (last_iteration) - { - endpos = SendRqstPtr.xrecoff % XLOG_BLCKSZ; - if (endpos == 0) - endpos = XLOG_BLCKSZ; - } - - finishing_seg = !ispartialpage && - (startoff + npages * XLOG_BLCKSZ) >= XLogSegSize; ! /* Only asked to send a partial page */ ! if (ispartialpage) ! LogstreamResult.Send = SendRqstPtr; ! if (last_iteration || ! npages >= MaxPagesPerXLogData || ! finishing_seg) { ! Size nbytes; ! uint8 flags = 0; ! ! if (finishing_seg) ! flags |= XLOGSTREAM_END_SEG; ! ! /* ! * XXX: Should we request the standby to fsync the log if the ! * current set might include a shutdown checkpoint record? ! */ ! ! /* OK to read and send the log */ ! pq_beginasyncmsg(outMsg, 'w'); ! pq_sendint(outMsg->buf, flags, 1); ! pq_sendint(outMsg->buf, LogstreamResult.Send.xlogid, 4); ! pq_sendint(outMsg->buf, LogstreamResult.Send.xrecoff, 4); ! ! nbytes = (npages - 1) * (Size) XLOG_BLCKSZ - startpos + endpos; ! ! /* ! * Read the log into the output buffer directly to prevent ! * extra memcpy calls. ! */ ! XLogRead(BufferGetStringInfo(outMsg->buf, nbytes), ! startoff + startpos, nbytes, LogstreamResult.Send); ! res = pq_endasyncmsg(outMsg); ! if (res < 0) ! return false; ! if (res == 0) ! break; ! /* ! * Stop sending the log for another job (e.g., checking for ! * interrupts) periodically. ! */ ! if (++nmsgs > MaxMsgsPerXLogSend) ! { ! pending_xlog_send = true; ! break; ! } ! ! npages = 0; ! } ! if (ispartialpage) break; } --- 522,588 ---- * sending in the last page. We must initialize all of them to * keep the compiler quiet. */ while (XLByteLT(LogstreamResult.Send, SendRqstPtr)) { + XLogRecPtr startptr; + XLogRecPtr endptr; + Size nbytes; + uint8 flags = 0; + /* ! * Figure out how much to send in one message. If there's less than ! * MAX_SEND_SIZE bytes to send, send everything. Otherwise send ! * MAX_SEND_SIZE bytes, but round to page boundary for efficiency. */ ! startptr = LogstreamResult.Send; ! endptr = startptr; ! endptr.xrecoff += MAX_SEND_SIZE; ! if(endptr.xrecoff < startptr.xrecoff) ! endptr.xlogid++; /* xrecoff overflowed */ ! /* round down to page boundary */ ! endptr.xrecoff -= (endptr.xrecoff % XLOG_BLCKSZ); ! if (XLByteLT(SendRqstPtr, endptr)) ! endptr = SendRqstPtr; /* ! * XXX: Should we request the standby to fsync the log if the ! * current set might include a shutdown checkpoint record? ! * ! * Heikki: Well, we don't do that with other checkpoints, I don't ! * see why we should at a shutdown checkpoint. However, perhaps ! * walreceiver should do an fsync whenever the connection is lost, ! * whatever the reason (e.g the master has been shut down) ? */ ! /* OK to read and send the log */ ! pq_beginasyncmsg(outMsg, 'w'); ! pq_sendint(outMsg->buf, flags, 1); ! pq_sendint(outMsg->buf, startptr.xlogid, 4); ! pq_sendint(outMsg->buf, startptr.xrecoff, 4); ! if (endptr.xlogid != startptr.xlogid) { ! Assert(endptr.xlogid == startptr.xlogid + 1); ! nbytes = (0xffffffff - endptr.xrecoff) + startptr.xrecoff; ! } ! else ! nbytes = endptr.xrecoff - startptr.xrecoff; ! LogstreamResult.Send = endptr; ! /* ! * Read the log into the output buffer directly to prevent ! * extra memcpy calls. ! */ ! XLogRead(BufferGetStringInfo(outMsg->buf, nbytes), startptr, nbytes); ! res = pq_endasyncmsg(outMsg); ! if (res < 0) ! return false; ! if (res == 0) break; }
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers