Alvaro Herrera <alvhe...@2ndquadrant.com> wrote: > On 2019-Nov-12, Antonin Houska wrote: > > > ok, the next version uses explicit lseek(). Maybe the fact that XLOG is > > mostly > > read sequentially (i.e. without frequent seeks) is the reason pread() has't > > been adopted so far. > > I don't quite understand why you backed off from switching to pread. It > seemed a good change to me.
I agreed with Michael that it makes comparison of the old and new code more difficult, and I also thought that his arguments about performance might be worthwhile because WAL reading is mostly sequential and does not require many seeks. However things appear to be more complex, see below. > Here's a few edits on top of your latest. > ... I agree with your renamings. > Change xlr_seg to be a struct rather than pointer to struct. It seems a > bit dangerous to me to return a pointer that we don't know is going to > be valid at raise-error time. Struct assignment works fine for the > purpose. ok > I would only like to switch this back to pg_pread() (from seek/read) and > I'd be happy to commit this. I realized that, starting from commit 709d003fbd98b975a4fbcb4c5750fa6efaf9ad87 we use the WALOpenSegment.ws_off field incorrectly in walsender.c:XLogRead(). In that commit we used this field to replace XLogReaderState.readOff: @@ -156,10 +165,9 @@ struct XLogReaderState char *readBuf; uint32 readLen; - /* last read segment, segment offset, TLI for data currently in readBuf */ - XLogSegNo readSegNo; - uint32 readOff; - TimeLineID readPageTLI; + /* last read XLOG position for data currently in readBuf */ + WALSegmentContext segcxt; + WALOpenSegment seg; /* * beginning of prior page read, and its TLI. Doesn't necessarily Thus we cannot use it in XLogRead() to track the current position in the segment file. Although walsender.c:XLogRead() misses this point, it's not broken because walsender.c does not use XLogReaderState at all. So if explicit lseek() should be used, another field should be added to WALOpenSegment. I failed to do so when removing the pg_pread() call from the patch, and that was the reason for the problem reported here: https://www.postgresql.org/message-id/20191117042221.GA16537%40alvherre.pgsql https://www.postgresql.org/message-id/20191120083802.gb47...@paquier.xyz Thus the use of pg_pread() makes the code quite a bit simpler, so I re-introduced it. If you decide that an explicit lseek() should be used yet, just let me know. > What is logical_read_local_xlog_page all about? Seems useless. Let's > get rid of it. It seems so. Should I post a patch for that? -- Antonin Houska Web: https://www.cybertec-postgresql.com
>From 52c044cb0f2f27c8714e5a8e41eb0e6da4714fd5 Mon Sep 17 00:00:00 2001 From: Antonin Houska <a...@cybertec.at> Date: Wed, 20 Nov 2019 15:11:48 +0100 Subject: [PATCH 1/2] Use only xlogreader.c:XLogRead() The implementations in xlogutils.c and walsender.c are just renamed now, to be removed by the following diff. --- src/backend/access/transam/xlogreader.c | 101 ++++++++++++++- src/backend/access/transam/xlogutils.c | 85 ++++++++++-- src/backend/replication/walsender.c | 144 ++++++++++++++++++++- src/bin/pg_waldump/pg_waldump.c | 164 +++++++----------------- src/include/access/xlogreader.h | 36 ++++++ src/include/access/xlogutils.h | 2 + 6 files changed, 399 insertions(+), 133 deletions(-) diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 7f24f0cb95..0742fcad7f 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -17,6 +17,8 @@ */ #include "postgres.h" +#include <unistd.h> + #include "access/transam.h" #include "access/xlog_internal.h" #include "access/xlogreader.h" @@ -27,6 +29,7 @@ #ifndef FRONTEND #include "miscadmin.h" +#include "pgstat.h" #include "utils/memutils.h" #endif @@ -295,8 +298,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg) * byte to cover the whole record header, or at least the part of it that * fits on the same page. */ - readOff = ReadPageInternal(state, - targetPagePtr, + readOff = ReadPageInternal(state, targetPagePtr, Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ)); if (readOff < 0) goto err; @@ -1015,6 +1017,101 @@ out: #endif /* FRONTEND */ +/* + * Read 'count' bytes into 'buf', starting at location 'startptr', from WAL + * fetched from timeline 'tli'. + * + * 'seg/segcxt' identify the last segment used. 'openSegment' is a callback + * to open the next segment, if necessary. + * + * Returns true if succeeded, false if an error occurs, in which case + * 'errinfo' receives error details. + * + * XXX probably this should be improved to suck data directly from the + * WAL buffers when possible. + */ +bool +WALRead(char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, + WALOpenSegment *seg, WALSegmentContext *segcxt, + WALSegmentOpen openSegment, WALReadError *errinfo) +{ + char *p; + XLogRecPtr recptr; + Size nbytes; + + p = buf; + recptr = startptr; + nbytes = count; + + while (nbytes > 0) + { + uint32 startoff; + int segbytes; + int readbytes; + + startoff = XLogSegmentOffset(recptr, segcxt->ws_segsize); + + if (seg->ws_file < 0 || + !XLByteInSeg(recptr, seg->ws_segno, segcxt->ws_segsize) || + tli != seg->ws_tli) + { + XLogSegNo nextSegNo; + + /* Switch to another logfile segment */ + if (seg->ws_file >= 0) + close(seg->ws_file); + + XLByteToSeg(recptr, nextSegNo, segcxt->ws_segsize); + + /* Open the next segment in the caller's way. */ + seg->ws_file = openSegment(nextSegNo, segcxt, &tli); + + /* Update the current segment info. */ + seg->ws_tli = tli; + seg->ws_segno = nextSegNo; + } + + /* How many bytes are within this segment? */ + if (nbytes > (segcxt->ws_segsize - startoff)) + segbytes = segcxt->ws_segsize - startoff; + else + segbytes = nbytes; + +#ifndef FRONTEND + pgstat_report_wait_start(WAIT_EVENT_WAL_READ); +#endif + + /* + * Failure to read the data does not necessarily imply non-zero errno. + * Set it to zero so that caller can distinguish the failure that does + * not affect errno. + */ + errno = 0; + + readbytes = pg_pread(seg->ws_file, p, segbytes, startoff); + +#ifndef FRONTEND + pgstat_report_wait_end(); +#endif + + if (readbytes <= 0) + { + errinfo->wre_errno = errno; + errinfo->wre_req = segbytes; + errinfo->wre_read = readbytes; + errinfo->wre_seg = *seg; + return false; + } + + /* Update state for read */ + recptr += readbytes; + nbytes -= readbytes; + p += readbytes; + } + + return true; +} + /* ---------------------------------------- * Functions for decoding the data and block references in a record. * ---------------------------------------- diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index 5f1e5ba75d..baca17260c 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -653,8 +653,8 @@ XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum, * frontend). Probably these should be merged at some point. */ static void -XLogRead(char *buf, int segsize, TimeLineID tli, XLogRecPtr startptr, - Size count) +XLogReadOld(char *buf, int segsize, TimeLineID tli, XLogRecPtr startptr, + Size count) { char *p; XLogRecPtr recptr; @@ -896,6 +896,36 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa } } +/* + * Callback for WALRead() to open the next segment. + */ +static int +wal_segment_open(XLogSegNo nextSegNo, WALSegmentContext *segcxt, + TimeLineID *tli_p) +{ + TimeLineID tli = *tli_p; + char path[MAXPGPATH]; + int fd; + + XLogFilePath(path, tli, nextSegNo, segcxt->ws_segsize); + fd = BasicOpenFile(path, O_RDONLY | PG_BINARY); + if (fd >= 0) + return fd; + + if (errno == ENOENT) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("requested WAL segment %s has already been removed", + path))); + else + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", + path))); + + return -1; /* keep compiler quiet */ +} + /* * read_page callback for reading local xlog files * @@ -913,7 +943,9 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, { XLogRecPtr read_upto, loc; + TimeLineID tli; int count; + WALReadError errinfo; loc = targetPagePtr + reqLen; @@ -932,7 +964,7 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, read_upto = GetFlushRecPtr(); else read_upto = GetXLogReplayRecPtr(&ThisTimeLineID); - state->seg.ws_tli = ThisTimeLineID; + tli = ThisTimeLineID; /* * Check which timeline to get the record from. @@ -982,14 +1014,14 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, read_upto = state->currTLIValidUntil; /* - * Setting ws_tli to our wanted record's TLI is slightly wrong; - * the page might begin on an older timeline if it contains a - * timeline switch, since its xlog segment will have been copied - * from the prior timeline. This is pretty harmless though, as - * nothing cares so long as the timeline doesn't go backwards. We - * should read the page header instead; FIXME someday. + * Setting tli to our wanted record's TLI is slightly wrong; the + * page might begin on an older timeline if it contains a timeline + * switch, since its xlog segment will have been copied from the + * prior timeline. This is pretty harmless though, as nothing + * cares so long as the timeline doesn't go backwards. We should + * read the page header instead; FIXME someday. */ - state->seg.ws_tli = state->currTLI; + tli = state->currTLI; /* No need to wait on a historical timeline */ break; @@ -1020,9 +1052,38 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, * as 'count', read the whole page anyway. It's guaranteed to be * zero-padded up to the page boundary if it's incomplete. */ - XLogRead(cur_page, state->segcxt.ws_segsize, state->seg.ws_tli, targetPagePtr, - XLOG_BLCKSZ); + if (!WALRead(cur_page, targetPagePtr, XLOG_BLCKSZ, tli, &state->seg, + &state->segcxt, wal_segment_open, &errinfo)) + WALReadRaiseError(&errinfo); /* number of valid bytes in the buffer */ return count; } + +/* + * Backend-specific convenience code to handle read errors encountered by + * WALRead(). + */ +void +WALReadRaiseError(WALReadError *errinfo) +{ + WALOpenSegment *seg = &errinfo->wre_seg; + char *fname = XLogFileNameP(seg->ws_tli, seg->ws_segno); + + if (errinfo->wre_read < 0) + { + errno = errinfo->wre_errno; + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read from log segment %s, offset %u, length %zu: %m", + fname, seg->ws_off, (Size) errinfo->wre_req))); + } + else if (errinfo->wre_read == 0) + { + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("could not read from log segment %s, offset %u: read %d of %zu", + fname, seg->ws_off, errinfo->wre_read, + (Size) errinfo->wre_req))); + } +} diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 7f5671504f..11611072b0 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -248,9 +248,13 @@ static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time); static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now); static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch); -static void XLogRead(WALSegmentContext *segcxt, char *buf, XLogRecPtr startptr, Size count); +static int WalSndSegmentOpen(XLogSegNo nextSegNo, WALSegmentContext *segcxt, + TimeLineID *tli_p); +static void XLogReadOld(WALSegmentContext *segcxt, char *buf, + XLogRecPtr startptr, Size count); + /* Initialize walsender process before entering the main command loop */ void InitWalSender(void) @@ -766,6 +770,8 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req { XLogRecPtr flushptr; int count; + WALReadError errinfo; + XLogSegNo segno; XLogReadDetermineTimeline(state, targetPagePtr, reqLen); sendTimeLineIsHistoric = (state->currTLI != ThisTimeLineID); @@ -786,7 +792,27 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req count = flushptr - targetPagePtr; /* part of the page available */ /* now actually read the data, we know it's there */ - XLogRead(sendCxt, cur_page, targetPagePtr, XLOG_BLCKSZ); + if (!WALRead(cur_page, + targetPagePtr, + XLOG_BLCKSZ, + sendSeg->ws_tli, /* Pass the current TLI because only + * WalSndSegmentOpen controls whether new + * TLI is needed. */ + sendSeg, + sendCxt, + WalSndSegmentOpen, + &errinfo)) + WALReadRaiseError(&errinfo); + + /* + * After reading into the buffer, check that what we read was valid. We do + * this after reading, because even though the segment was present when we + * opened it, it might get recycled or removed while we read it. The + * read() succeeds in that case, but the data we tried to read might + * already have been overwritten with new WAL records. + */ + XLByteToSeg(targetPagePtr, segno, sendCxt->ws_segsize); + CheckXLogRemoved(segno, sendSeg->ws_tli); return count; } @@ -2362,7 +2388,7 @@ WalSndKill(int code, Datum arg) * more than one. */ static void -XLogRead(WALSegmentContext *segcxt, char *buf, XLogRecPtr startptr, Size count) +XLogReadOld(WALSegmentContext *segcxt, char *buf, XLogRecPtr startptr, Size count) { char *p; XLogRecPtr recptr; @@ -2535,6 +2561,72 @@ retry: } } +/* + * Callback for XLogRead() to open the next segment. + */ +int +WalSndSegmentOpen(XLogSegNo nextSegNo, WALSegmentContext *segcxt, + TimeLineID *tli_p) +{ + char path[MAXPGPATH]; + int fd; + + /*------- + * When reading from a historic timeline, and there is a timeline switch + * within this segment, read from the WAL segment belonging to the new + * timeline. + * + * For example, imagine that this server is currently on timeline 5, and + * we're streaming timeline 4. The switch from timeline 4 to 5 happened at + * 0/13002088. In pg_wal, we have these files: + * + * ... + * 000000040000000000000012 + * 000000040000000000000013 + * 000000050000000000000013 + * 000000050000000000000014 + * ... + * + * In this situation, when requested to send the WAL from segment 0x13, on + * timeline 4, we read the WAL from file 000000050000000000000013. Archive + * recovery prefers files from newer timelines, so if the segment was + * restored from the archive on this server, the file belonging to the old + * timeline, 000000040000000000000013, might not exist. Their contents are + * equal up to the switchpoint, because at a timeline switch, the used + * portion of the old segment is copied to the new file. ------- + */ + *tli_p = sendTimeLine; + if (sendTimeLineIsHistoric) + { + XLogSegNo endSegNo; + + XLByteToSeg(sendTimeLineValidUpto, endSegNo, segcxt->ws_segsize); + if (sendSeg->ws_segno == endSegNo) + *tli_p = sendTimeLineNextTLI; + } + + XLogFilePath(path, *tli_p, nextSegNo, segcxt->ws_segsize); + fd = BasicOpenFile(path, O_RDONLY | PG_BINARY); + if (fd >= 0) + return fd; + + /* + * If the file is not found, assume it's because the standby asked for a + * too old WAL segment that has already been removed or recycled. + */ + if (errno == ENOENT) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("requested WAL segment %s has already been removed", + XLogFileNameP(*tli_p, nextSegNo)))); + else + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", + path))); + return -1; /* keep compiler quiet */ +} + /* * Send out the WAL in its normal physical/stored form. * @@ -2552,6 +2644,8 @@ XLogSendPhysical(void) XLogRecPtr startptr; XLogRecPtr endptr; Size nbytes; + XLogSegNo segno; + WALReadError errinfo; /* If requested switch the WAL sender to the stopping state. */ if (got_STOPPING) @@ -2767,7 +2861,49 @@ XLogSendPhysical(void) * calls. */ enlargeStringInfo(&output_message, nbytes); - XLogRead(sendCxt, &output_message.data[output_message.len], startptr, nbytes); + +retry: + if (!WALRead(&output_message.data[output_message.len], + startptr, + nbytes, + sendSeg->ws_tli, /* Pass the current TLI because only + * WalSndSegmentOpen controls whether new + * TLI is needed. */ + sendSeg, + sendCxt, + WalSndSegmentOpen, + &errinfo)) + WALReadRaiseError(&errinfo); + + /* See logical_read_xlog_page(). */ + XLByteToSeg(startptr, segno, sendCxt->ws_segsize); + CheckXLogRemoved(segno, sendSeg->ws_tli); + + /* + * During recovery, the currently-open WAL file might be replaced with the + * file of the same name retrieved from archive. So we always need to + * check what we read was valid after reading into the buffer. If it's + * invalid, we try to open and read the file again. + */ + if (am_cascading_walsender) + { + WalSnd *walsnd = MyWalSnd; + bool reload; + + SpinLockAcquire(&walsnd->mutex); + reload = walsnd->needreload; + walsnd->needreload = false; + SpinLockRelease(&walsnd->mutex); + + if (reload && sendSeg->ws_file >= 0) + { + close(sendSeg->ws_file); + sendSeg->ws_file = -1; + + goto retry; + } + } + output_message.len += nbytes; output_message.data[output_message.len] = '\0'; diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c index 1524e5eb1e..d389df00b9 100644 --- a/src/bin/pg_waldump/pg_waldump.c +++ b/src/bin/pg_waldump/pg_waldump.c @@ -280,137 +280,56 @@ identify_target_directory(char *directory, char *fname) return NULL; /* not reached */ } -/* - * Read count bytes from a segment file in the specified directory, for the - * given timeline, containing the specified record pointer; store the data in - * the passed buffer. - */ -static void -XLogDumpXLogRead(const char *directory, TimeLineID timeline_id, - XLogRecPtr startptr, char *buf, Size count) +static int +WALDumpOpenSegment(XLogSegNo nextSegNo, WALSegmentContext *segcxt, + TimeLineID *tli_p) { - char *p; - XLogRecPtr recptr; - Size nbytes; - - static int sendFile = -1; - static XLogSegNo sendSegNo = 0; - static uint32 sendOff = 0; + TimeLineID tli = *tli_p; + char fname[MAXPGPATH]; + int fd; + int tries; - p = buf; - recptr = startptr; - nbytes = count; + XLogFileName(fname, tli, nextSegNo, segcxt->ws_segsize); - while (nbytes > 0) + /* + * In follow mode there is a short period of time after the server has + * written the end of the previous file before the new file is available. + * So we loop for 5 seconds looking for the file to appear before giving + * up. + */ + for (tries = 0; tries < 10; tries++) { - uint32 startoff; - int segbytes; - int readbytes; - - startoff = XLogSegmentOffset(recptr, WalSegSz); - - if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo, WalSegSz)) - { - char fname[MAXFNAMELEN]; - int tries; - - /* Switch to another logfile segment */ - if (sendFile >= 0) - close(sendFile); - - XLByteToSeg(recptr, sendSegNo, WalSegSz); - - XLogFileName(fname, timeline_id, sendSegNo, WalSegSz); - - /* - * In follow mode there is a short period of time after the server - * has written the end of the previous file before the new file is - * available. So we loop for 5 seconds looking for the file to - * appear before giving up. - */ - for (tries = 0; tries < 10; tries++) - { - sendFile = open_file_in_directory(directory, fname); - if (sendFile >= 0) - break; - if (errno == ENOENT) - { - int save_errno = errno; - - /* File not there yet, try again */ - pg_usleep(500 * 1000); - - errno = save_errno; - continue; - } - /* Any other error, fall through and fail */ - break; - } - - if (sendFile < 0) - fatal_error("could not find file \"%s\": %s", - fname, strerror(errno)); - sendOff = 0; - } - - /* Need to seek in the file? */ - if (sendOff != startoff) - { - if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0) - { - int err = errno; - char fname[MAXPGPATH]; - - XLogFileName(fname, timeline_id, sendSegNo, WalSegSz); - - fatal_error("could not seek in log file %s to offset %u: %s", - fname, startoff, strerror(err)); - } - sendOff = startoff; - } - - /* How many bytes are within this segment? */ - if (nbytes > (WalSegSz - startoff)) - segbytes = WalSegSz - startoff; - else - segbytes = nbytes; - - readbytes = read(sendFile, p, segbytes); - if (readbytes <= 0) + fd = open_file_in_directory(segcxt->ws_dir, fname); + if (fd >= 0) + return fd; + if (errno == ENOENT) { - int err = errno; - char fname[MAXPGPATH]; int save_errno = errno; - XLogFileName(fname, timeline_id, sendSegNo, WalSegSz); - errno = save_errno; + /* File not there yet, try again */ + pg_usleep(500 * 1000); - if (readbytes < 0) - fatal_error("could not read from log file %s, offset %u, length %d: %s", - fname, sendOff, segbytes, strerror(err)); - else if (readbytes == 0) - fatal_error("could not read from log file %s, offset %u: read %d of %zu", - fname, sendOff, readbytes, (Size) segbytes); + errno = save_errno; + continue; } - - /* Update state for read */ - recptr += readbytes; - - sendOff += readbytes; - nbytes -= readbytes; - p += readbytes; + /* Any other error, fall through and fail */ + break; } + + fatal_error("could not find file \"%s\": %s", fname, strerror(errno)); + return -1; /* keep compiler quiet */ } /* * XLogReader read_page callback */ static int -XLogDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, - XLogRecPtr targetPtr, char *readBuff) +WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, + XLogRecPtr targetPtr, char *readBuff) { XLogDumpPrivate *private = state->private_data; int count = XLOG_BLCKSZ; + WALReadError errinfo; if (private->endptr != InvalidXLogRecPtr) { @@ -425,8 +344,23 @@ XLogDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, } } - XLogDumpXLogRead(state->segcxt.ws_dir, private->timeline, targetPagePtr, - readBuff, count); + if (!WALRead(readBuff, targetPagePtr, count, private->timeline, + &state->seg, &state->segcxt, WALDumpOpenSegment, &errinfo)) + { + WALOpenSegment *seg = &errinfo.wre_seg; + char fname[MAXPGPATH]; + + XLogFileName(fname, seg->ws_tli, seg->ws_segno, + state->segcxt.ws_segsize); + + if (errinfo.wre_errno != 0) + fatal_error("could not read in file %s, offset %u, length %zu: %s", + fname, seg->ws_off, (Size) errinfo.wre_req, + strerror(errinfo.wre_errno)); + else + fatal_error("could not read in file %s, offset %u: length: %zu", + fname, seg->ws_off, (Size) errinfo.wre_req); + } return count; } @@ -1089,7 +1023,7 @@ main(int argc, char **argv) /* done with argument parsing, do the actual work */ /* we have everything we need, start reading */ - xlogreader_state = XLogReaderAllocate(WalSegSz, waldir, XLogDumpReadPage, + xlogreader_state = XLogReaderAllocate(WalSegSz, waldir, WALDumpReadPage, &private); if (!xlogreader_state) fatal_error("out of memory"); diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index 1bbee386e8..33273e7327 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -217,6 +217,24 @@ extern XLogReaderState *XLogReaderAllocate(int wal_segment_size, /* Free an XLogReader */ extern void XLogReaderFree(XLogReaderState *state); +/* + * Callback to open the specified WAL segment for reading. Returns a valid + * file descriptor when the file was opened successfully. + * + * "nextSegNo" is the number of the segment to be opened. + * + * "segcxt" is additional information about the segment. + * + * "tli_p" is an input/output argument. XLogRead() uses it to pass the + * timeline in which the new segment should be found, but the callback can use + * it to return the TLI that it actually opened. + * + * BasicOpenFile() is the preferred way to open the segment file in backend + * code, whereas open(2) should be used in frontend. + */ +typedef int (*WALSegmentOpen) (XLogSegNo nextSegNo, WALSegmentContext *segcxt, + TimeLineID *tli_p); + /* Initialize supporting structures */ extern void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt, int segsize, const char *waldir); @@ -232,6 +250,24 @@ extern bool XLogReaderValidatePageHeader(XLogReaderState *state, #ifdef FRONTEND extern XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr); #endif /* FRONTEND */ + +/* + * Error information from WALRead that both backend and frontend caller can + * process. + */ +typedef struct WALReadError +{ + int wre_errno; /* errno set by the last read() / lseek() */ + int wre_read; /* Bytes read by the last read(). */ + int wre_req; /* Bytes requested to be read. */ + WALOpenSegment wre_seg; /* Segment we tried to read from. */ +} WALReadError; + +extern bool WALRead(char *buf, XLogRecPtr startptr, Size count, + TimeLineID tli, WALOpenSegment *seg, + WALSegmentContext *segcxt, WALSegmentOpen openSegment, + WALReadError *errinfo); + /* Functions for decoding an XLogRecord */ extern bool DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h index 2df98e45b2..3fe5b36748 100644 --- a/src/include/access/xlogutils.h +++ b/src/include/access/xlogutils.h @@ -54,4 +54,6 @@ extern int read_local_xlog_page(XLogReaderState *state, extern void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength); +void WALReadRaiseError(WALReadError *errinfo); + #endif -- 2.20.1
>From ab0bc03480c0b299e1feca38ccfa4c1ec44c9c7c Mon Sep 17 00:00:00 2001 From: Antonin Houska <a...@cybertec.at> Date: Wed, 20 Nov 2019 15:11:48 +0100 Subject: [PATCH 2/2] Remove the old implemenations of XLogRead(). Done in a separate patch because the diff looks harder to read if one function (XLogRead) is removed and another one (the WALSegmentOpen callback) is added nearby at the same time (the addition and removal of code can get mixed in the diff). --- src/backend/access/transam/xlogutils.c | 122 ---------------- src/backend/replication/walsender.c | 188 ------------------------- 2 files changed, 310 deletions(-) diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index baca17260c..a16d47f156 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -639,128 +639,6 @@ XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum, forget_invalid_pages(rnode, forkNum, nblocks); } -/* - * Read 'count' bytes from WAL into 'buf', starting at location 'startptr' - * in timeline 'tli'. - * - * Will open, and keep open, one WAL segment stored in the static file - * descriptor 'sendFile'. This means if XLogRead is used once, there will - * always be one descriptor left open until the process ends, but never - * more than one. - * - * XXX This is very similar to pg_waldump's XLogDumpXLogRead and to XLogRead - * in walsender.c but for small differences (such as lack of elog() in - * frontend). Probably these should be merged at some point. - */ -static void -XLogReadOld(char *buf, int segsize, TimeLineID tli, XLogRecPtr startptr, - Size count) -{ - char *p; - XLogRecPtr recptr; - Size nbytes; - - /* state maintained across calls */ - static int sendFile = -1; - static XLogSegNo sendSegNo = 0; - static TimeLineID sendTLI = 0; - static uint32 sendOff = 0; - - Assert(segsize == wal_segment_size); - - p = buf; - recptr = startptr; - nbytes = count; - - while (nbytes > 0) - { - uint32 startoff; - int segbytes; - int readbytes; - - startoff = XLogSegmentOffset(recptr, segsize); - - /* Do we need to switch to a different xlog segment? */ - if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo, segsize) || - sendTLI != tli) - { - char path[MAXPGPATH]; - - if (sendFile >= 0) - close(sendFile); - - XLByteToSeg(recptr, sendSegNo, segsize); - - XLogFilePath(path, tli, sendSegNo, segsize); - - sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY); - - if (sendFile < 0) - { - if (errno == ENOENT) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("requested WAL segment %s has already been removed", - path))); - else - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not open file \"%s\": %m", - path))); - } - sendOff = 0; - sendTLI = tli; - } - - /* Need to seek in the file? */ - if (sendOff != startoff) - { - if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0) - { - char path[MAXPGPATH]; - int save_errno = errno; - - XLogFilePath(path, tli, sendSegNo, segsize); - errno = save_errno; - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not seek in log segment %s to offset %u: %m", - path, startoff))); - } - sendOff = startoff; - } - - /* How many bytes are within this segment? */ - if (nbytes > (segsize - startoff)) - segbytes = segsize - startoff; - else - segbytes = nbytes; - - pgstat_report_wait_start(WAIT_EVENT_WAL_READ); - readbytes = read(sendFile, p, segbytes); - pgstat_report_wait_end(); - if (readbytes <= 0) - { - char path[MAXPGPATH]; - int save_errno = errno; - - XLogFilePath(path, tli, sendSegNo, segsize); - errno = save_errno; - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not read from log segment %s, offset %u, length %lu: %m", - path, sendOff, (unsigned long) segbytes))); - } - - /* Update state for read */ - recptr += readbytes; - - sendOff += readbytes; - nbytes -= readbytes; - p += readbytes; - } -} - /* * Determine which timeline to read an xlog page from and set the * XLogReaderState's currTLI to that timeline ID. diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 11611072b0..d2426b0960 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -252,9 +252,6 @@ static int WalSndSegmentOpen(XLogSegNo nextSegNo, WALSegmentContext *segcxt, TimeLineID *tli_p); -static void XLogReadOld(WALSegmentContext *segcxt, char *buf, - XLogRecPtr startptr, Size count); - /* Initialize walsender process before entering the main command loop */ void InitWalSender(void) @@ -2376,191 +2373,6 @@ WalSndKill(int code, Datum arg) SpinLockRelease(&walsnd->mutex); } -/* - * Read 'count' bytes from WAL into 'buf', starting at location 'startptr' - * - * XXX probably this should be improved to suck data directly from the - * WAL buffers when possible. - * - * Will open, and keep open, one WAL segment stored in the global file - * descriptor sendFile. This means if XLogRead is used once, there will - * always be one descriptor left open until the process ends, but never - * more than one. - */ -static void -XLogReadOld(WALSegmentContext *segcxt, char *buf, XLogRecPtr startptr, Size count) -{ - char *p; - XLogRecPtr recptr; - Size nbytes; - XLogSegNo segno; - -retry: - p = buf; - recptr = startptr; - nbytes = count; - - while (nbytes > 0) - { - uint32 startoff; - int segbytes; - int readbytes; - - startoff = XLogSegmentOffset(recptr, segcxt->ws_segsize); - - if (sendSeg->ws_file < 0 || - !XLByteInSeg(recptr, sendSeg->ws_segno, segcxt->ws_segsize)) - { - char path[MAXPGPATH]; - - /* Switch to another logfile segment */ - if (sendSeg->ws_file >= 0) - close(sendSeg->ws_file); - - XLByteToSeg(recptr, sendSeg->ws_segno, segcxt->ws_segsize); - - /*------- - * When reading from a historic timeline, and there is a timeline - * switch within this segment, read from the WAL segment belonging - * to the new timeline. - * - * For example, imagine that this server is currently on timeline - * 5, and we're streaming timeline 4. The switch from timeline 4 - * to 5 happened at 0/13002088. In pg_wal, we have these files: - * - * ... - * 000000040000000000000012 - * 000000040000000000000013 - * 000000050000000000000013 - * 000000050000000000000014 - * ... - * - * In this situation, when requested to send the WAL from - * segment 0x13, on timeline 4, we read the WAL from file - * 000000050000000000000013. Archive recovery prefers files from - * newer timelines, so if the segment was restored from the - * archive on this server, the file belonging to the old timeline, - * 000000040000000000000013, might not exist. Their contents are - * equal up to the switchpoint, because at a timeline switch, the - * used portion of the old segment is copied to the new file. - *------- - */ - sendSeg->ws_tli = sendTimeLine; - if (sendTimeLineIsHistoric) - { - XLogSegNo endSegNo; - - XLByteToSeg(sendTimeLineValidUpto, endSegNo, segcxt->ws_segsize); - if (sendSeg->ws_segno == endSegNo) - sendSeg->ws_tli = sendTimeLineNextTLI; - } - - XLogFilePath(path, sendSeg->ws_tli, sendSeg->ws_segno, segcxt->ws_segsize); - - sendSeg->ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY); - if (sendSeg->ws_file < 0) - { - /* - * If the file is not found, assume it's because the standby - * asked for a too old WAL segment that has already been - * removed or recycled. - */ - if (errno == ENOENT) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("requested WAL segment %s has already been removed", - XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno)))); - else - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not open file \"%s\": %m", - path))); - } - sendSeg->ws_off = 0; - } - - /* Need to seek in the file? */ - if (sendSeg->ws_off != startoff) - { - if (lseek(sendSeg->ws_file, (off_t) startoff, SEEK_SET) < 0) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not seek in log segment %s to offset %u: %m", - XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno), - startoff))); - sendSeg->ws_off = startoff; - } - - /* How many bytes are within this segment? */ - if (nbytes > (segcxt->ws_segsize - startoff)) - segbytes = segcxt->ws_segsize - startoff; - else - segbytes = nbytes; - - pgstat_report_wait_start(WAIT_EVENT_WAL_READ); - readbytes = read(sendSeg->ws_file, p, segbytes); - pgstat_report_wait_end(); - if (readbytes < 0) - { - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not read from log segment %s, offset %u, length %zu: %m", - XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno), - sendSeg->ws_off, (Size) segbytes))); - } - else if (readbytes == 0) - { - ereport(ERROR, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("could not read from log segment %s, offset %u: read %d of %zu", - XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno), - sendSeg->ws_off, readbytes, (Size) segbytes))); - } - - /* Update state for read */ - recptr += readbytes; - - sendSeg->ws_off += readbytes; - nbytes -= readbytes; - p += readbytes; - } - - /* - * After reading into the buffer, check that what we read was valid. We do - * this after reading, because even though the segment was present when we - * opened it, it might get recycled or removed while we read it. The - * read() succeeds in that case, but the data we tried to read might - * already have been overwritten with new WAL records. - */ - XLByteToSeg(startptr, segno, segcxt->ws_segsize); - CheckXLogRemoved(segno, ThisTimeLineID); - - /* - * During recovery, the currently-open WAL file might be replaced with the - * file of the same name retrieved from archive. So we always need to - * check what we read was valid after reading into the buffer. If it's - * invalid, we try to open and read the file again. - */ - if (am_cascading_walsender) - { - WalSnd *walsnd = MyWalSnd; - bool reload; - - SpinLockAcquire(&walsnd->mutex); - reload = walsnd->needreload; - walsnd->needreload = false; - SpinLockRelease(&walsnd->mutex); - - if (reload && sendSeg->ws_file >= 0) - { - close(sendSeg->ws_file); - sendSeg->ws_file = -1; - - goto retry; - } - } -} - /* * Callback for XLogRead() to open the next segment. */ -- 2.20.1