This is the next version. -- Antonin Houska Web: https://www.cybertec-postgresql.com
>From 01f5cc8b0c1e6133e16242ec99957a78551058a7 Mon Sep 17 00:00:00 2001 From: Antonin Houska <a...@cybertec.at> Date: Fri, 4 Oct 2019 12:07:22 +0200 Subject: [PATCH 1/2] Use only xlogreader.c:XLogRead() The implementations in xlogutils.c and walsender.c are just renamed now, to be removed by the following diff. --- src/backend/access/transam/xlogreader.c | 141 ++++++++++++++++++++++- src/backend/access/transam/xlogutils.c | 61 ++++++++-- src/backend/replication/walsender.c | 143 +++++++++++++++++++++++- src/bin/pg_waldump/pg_waldump.c | 60 +++++++++- src/include/access/xlogreader.h | 42 +++++++ 5 files changed, 428 insertions(+), 19 deletions(-) diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index c8b0d2303d..3e2167ca5a 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -17,6 +17,8 @@ */ #include "postgres.h" +#include <unistd.h> + #include "access/transam.h" #include "access/xlogrecord.h" #include "access/xlog_internal.h" @@ -27,6 +29,7 @@ #ifndef FRONTEND #include "miscadmin.h" +#include "pgstat.h" #include "utils/memutils.h" #endif @@ -626,7 +629,13 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) if (!XLogReaderValidatePageHeader(state, pageptr, (char *) hdr)) goto err; - /* update read state information */ + /* + * Update read state information. + * + * Note that XLogRead(), if used, should have updated the "seg" too for + * its own reasons, however we cannot rely on ->read_page() to call + * XLogRead(). + */ state->seg.ws_segno = targetSegNo; state->seg.ws_off = targetPageOff; state->readLen = readLen; @@ -1016,6 +1025,136 @@ out: #endif /* FRONTEND */ +/* + * Read 'count' bytes from WAL fetched from timeline 'tli' into 'buf', + * starting at location 'startptr'. 'seg' is the last segment used, + * 'openSegment' is a callback to opens the next segment if needed and + * 'segcxt' is additional segment info that does not fit into 'seg'. + * + * 'errinfo' should point to XLogReadError structure which will receive error + * details in case the read fails. + * + * Returns true if succeeded, false if failed. + * + * XXX probably this should be improved to suck data directly from the + * WAL buffers when possible. + */ +bool +XLogRead(char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, + WALOpenSegment *seg, WALSegmentContext *segcxt, + WALSegmentOpen openSegment, XLogReadError *errinfo) +{ + char *p; + XLogRecPtr recptr; + Size nbytes; + + p = buf; + recptr = startptr; + nbytes = count; + + while (nbytes > 0) + { + int segbytes; + int readbytes; + + seg->ws_off = XLogSegmentOffset(recptr, segcxt->ws_segsize); + + if (seg->ws_file < 0 || + !XLByteInSeg(recptr, seg->ws_segno, segcxt->ws_segsize) || + tli != seg->ws_tli) + { + XLogSegNo nextSegNo; + + /* Switch to another logfile segment */ + if (seg->ws_file >= 0) + close(seg->ws_file); + + XLByteToSeg(recptr, nextSegNo, segcxt->ws_segsize); + + /* Open the next segment in the caller's way. */ + openSegment(nextSegNo, segcxt, &tli, &seg->ws_file); + + /* Update the current segment info. */ + seg->ws_tli = tli; + seg->ws_segno = nextSegNo; + seg->ws_off = 0; + } + + /* How many bytes are within this segment? */ + if (nbytes > (segcxt->ws_segsize - seg->ws_off)) + segbytes = segcxt->ws_segsize - seg->ws_off; + else + segbytes = nbytes; + +#ifndef FRONTEND + pgstat_report_wait_start(WAIT_EVENT_WAL_READ); +#endif + + /* + * Failure to read the data does not necessarily imply non-zero errno. + * Set it to zero so that caller can distinguish the failure that does + * not affect errno. + */ + errno = 0; + + readbytes = pg_pread(seg->ws_file, p, segbytes, seg->ws_off); + +#ifndef FRONTEND + pgstat_report_wait_end(); +#endif + + if (readbytes <= 0) + { + errinfo->read_errno = errno; + errinfo->readbytes = readbytes; + errinfo->reqbytes = segbytes; + errinfo->seg = seg; + return false; + } + + /* Update state for read */ + recptr += readbytes; + nbytes -= readbytes; + p += readbytes; + + /* Update the current segment info. */ + seg->ws_off += readbytes; + } + + return true; +} + +#ifndef FRONTEND +/* + * Backend-specific convenience code to handle read errors encountered by + * XLogRead(). + */ +void +XLogReadProcessError(XLogReadError *errinfo) +{ + WALOpenSegment *seg = errinfo->seg; + + if (errinfo->readbytes < 0) + { + errno = errinfo->read_errno; + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read from log segment %s, offset %u, length %zu: %m", + XLogFileNameP(seg->ws_tli, seg->ws_segno), + seg->ws_off, (Size) errinfo->reqbytes))); + } + else + { + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("could not read from log segment %s, offset %u: length %zu", + XLogFileNameP(seg->ws_tli, seg->ws_segno), + seg->ws_off, + (Size) errinfo->reqbytes))); + } +} +#endif + /* ---------------------------------------- * Functions for decoding the data and block references in a record. * ---------------------------------------- diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index 5f1e5ba75d..007974ea99 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -653,8 +653,8 @@ XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum, * frontend). Probably these should be merged at some point. */ static void -XLogRead(char *buf, int segsize, TimeLineID tli, XLogRecPtr startptr, - Size count) +XLogReadOld(char *buf, int segsize, TimeLineID tli, XLogRecPtr startptr, + Size count) { char *p; XLogRecPtr recptr; @@ -896,6 +896,39 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa } } +/* + * Callback for XLogRead() to open the next segment. + */ +static void +read_local_xlog_page_segment_open(XLogSegNo nextSegNo, + WALSegmentContext *segcxt, + TimeLineID *tli_p, + int *file_p) +{ + TimeLineID tli = *tli_p; + char path[MAXPGPATH]; + int file; + + XLogFilePath(path, tli, nextSegNo, segcxt->ws_segsize); + file = BasicOpenFile(path, O_RDONLY | PG_BINARY); + + if (file < 0) + { + if (errno == ENOENT) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("requested WAL segment %s has already been removed", + path))); + else + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", + path))); + } + + *file_p = file; +} + /* * read_page callback for reading local xlog files * @@ -913,7 +946,9 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, { XLogRecPtr read_upto, loc; + TimeLineID tli; int count; + XLogReadError errinfo; loc = targetPagePtr + reqLen; @@ -932,7 +967,7 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, read_upto = GetFlushRecPtr(); else read_upto = GetXLogReplayRecPtr(&ThisTimeLineID); - state->seg.ws_tli = ThisTimeLineID; + tli = ThisTimeLineID; /* * Check which timeline to get the record from. @@ -982,14 +1017,14 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, read_upto = state->currTLIValidUntil; /* - * Setting ws_tli to our wanted record's TLI is slightly wrong; - * the page might begin on an older timeline if it contains a - * timeline switch, since its xlog segment will have been copied - * from the prior timeline. This is pretty harmless though, as - * nothing cares so long as the timeline doesn't go backwards. We - * should read the page header instead; FIXME someday. + * Setting tli to our wanted record's TLI is slightly wrong; the + * page might begin on an older timeline if it contains a timeline + * switch, since its xlog segment will have been copied from the + * prior timeline. This is pretty harmless though, as nothing + * cares so long as the timeline doesn't go backwards. We should + * read the page header instead; FIXME someday. */ - state->seg.ws_tli = state->currTLI; + tli = state->currTLI; /* No need to wait on a historical timeline */ break; @@ -1020,8 +1055,10 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, * as 'count', read the whole page anyway. It's guaranteed to be * zero-padded up to the page boundary if it's incomplete. */ - XLogRead(cur_page, state->segcxt.ws_segsize, state->seg.ws_tli, targetPagePtr, - XLOG_BLCKSZ); + if (!XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ, tli, &state->seg, + &state->segcxt, read_local_xlog_page_segment_open, + &errinfo)) + XLogReadProcessError(&errinfo); /* number of valid bytes in the buffer */ return count; diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index eb4a98cc91..b21b143f99 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -248,9 +248,13 @@ static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time); static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now); static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch); -static void XLogRead(WALSegmentContext *segcxt, char *buf, XLogRecPtr startptr, Size count); +static void WalSndSegmentOpen(XLogSegNo nextSegNo, WALSegmentContext *segcxt, + TimeLineID *tli_p, int *file_p); +static void XLogReadOld(WALSegmentContext *segcxt, char *buf, + XLogRecPtr startptr, Size count); + /* Initialize walsender process before entering the main command loop */ void InitWalSender(void) @@ -766,6 +770,8 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req { XLogRecPtr flushptr; int count; + XLogReadError errinfo; + XLogSegNo segno; XLogReadDetermineTimeline(state, targetPagePtr, reqLen); sendTimeLineIsHistoric = (state->currTLI != ThisTimeLineID); @@ -786,7 +792,27 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req count = flushptr - targetPagePtr; /* part of the page available */ /* now actually read the data, we know it's there */ - XLogRead(sendCxt, cur_page, targetPagePtr, XLOG_BLCKSZ); + if (!XLogRead(cur_page, + targetPagePtr, + XLOG_BLCKSZ, + sendSeg->ws_tli, /* Pass the current TLI because only + * WalSndSegmentOpen controls whether new + * TLI is needed. */ + sendSeg, + sendCxt, + WalSndSegmentOpen, + &errinfo)) + XLogReadProcessError(&errinfo); + + /* + * After reading into the buffer, check that what we read was valid. We do + * this after reading, because even though the segment was present when we + * opened it, it might get recycled or removed while we read it. The + * read() succeeds in that case, but the data we tried to read might + * already have been overwritten with new WAL records. + */ + XLByteToSeg(targetPagePtr, segno, sendCxt->ws_segsize); + CheckXLogRemoved(segno, sendSeg->ws_tli); return count; } @@ -2363,7 +2389,7 @@ WalSndKill(int code, Datum arg) * more than one. */ static void -XLogRead(WALSegmentContext *segcxt, char *buf, XLogRecPtr startptr, Size count) +XLogReadOld(WALSegmentContext *segcxt, char *buf, XLogRecPtr startptr, Size count) { char *p; XLogRecPtr recptr; @@ -2536,6 +2562,71 @@ retry: } } +/* + * Callback for XLogRead() to open the next segment. + */ +void +WalSndSegmentOpen(XLogSegNo nextSegNo, WALSegmentContext *segcxt, + TimeLineID *tli_p, int *file_p) +{ + char path[MAXPGPATH]; + + /*------- + * When reading from a historic timeline, and there is a timeline switch + * within this segment, read from the WAL segment belonging to the new + * timeline. + * + * For example, imagine that this server is currently on timeline 5, and + * we're streaming timeline 4. The switch from timeline 4 to 5 happened at + * 0/13002088. In pg_wal, we have these files: + * + * ... + * 000000040000000000000012 + * 000000040000000000000013 + * 000000050000000000000013 + * 000000050000000000000014 + * ... + * + * In this situation, when requested to send the WAL from segment 0x13, on + * timeline 4, we read the WAL from file 000000050000000000000013. Archive + * recovery prefers files from newer timelines, so if the segment was + * restored from the archive on this server, the file belonging to the old + * timeline, 000000040000000000000013, might not exist. Their contents are + * equal up to the switchpoint, because at a timeline switch, the used + * portion of the old segment is copied to the new file. ------- + */ + *tli_p = sendTimeLine; + if (sendTimeLineIsHistoric) + { + XLogSegNo endSegNo; + + XLByteToSeg(sendTimeLineValidUpto, endSegNo, segcxt->ws_segsize); + if (sendSeg->ws_segno == endSegNo) + *tli_p = sendTimeLineNextTLI; + } + + XLogFilePath(path, *tli_p, nextSegNo, segcxt->ws_segsize); + *file_p = BasicOpenFile(path, O_RDONLY | PG_BINARY); + + if (*file_p < 0) + { + /* + * If the file is not found, assume it's because the standby asked for + * a too old WAL segment that has already been removed or recycled. + */ + if (errno == ENOENT) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("requested WAL segment %s has already been removed", + XLogFileNameP(*tli_p, nextSegNo)))); + else + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", + path))); + } +} + /* * Send out the WAL in its normal physical/stored form. * @@ -2553,6 +2644,8 @@ XLogSendPhysical(void) XLogRecPtr startptr; XLogRecPtr endptr; Size nbytes; + XLogSegNo segno; + XLogReadError errinfo; /* If requested switch the WAL sender to the stopping state. */ if (got_STOPPING) @@ -2768,7 +2861,49 @@ XLogSendPhysical(void) * calls. */ enlargeStringInfo(&output_message, nbytes); - XLogRead(sendCxt, &output_message.data[output_message.len], startptr, nbytes); + +retry: + if (!XLogRead(&output_message.data[output_message.len], + startptr, + nbytes, + sendSeg->ws_tli, /* Pass the current TLI because only + * WalSndSegmentOpen controls whether new + * TLI is needed. */ + sendSeg, + sendCxt, + WalSndSegmentOpen, + &errinfo)) + XLogReadProcessError(&errinfo); + + /* See logical_read_xlog_page(). */ + XLByteToSeg(startptr, segno, sendCxt->ws_segsize); + CheckXLogRemoved(segno, sendSeg->ws_tli); + + /* + * During recovery, the currently-open WAL file might be replaced with the + * file of the same name retrieved from archive. So we always need to + * check what we read was valid after reading into the buffer. If it's + * invalid, we try to open and read the file again. + */ + if (am_cascading_walsender) + { + WalSnd *walsnd = MyWalSnd; + bool reload; + + SpinLockAcquire(&walsnd->mutex); + reload = walsnd->needreload; + walsnd->needreload = false; + SpinLockRelease(&walsnd->mutex); + + if (reload && sendSeg->ws_file >= 0) + { + close(sendSeg->ws_file); + sendSeg->ws_file = -1; + + goto retry; + } + } + output_message.len += nbytes; output_message.data[output_message.len] = '\0'; diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c index b79208cd73..4c49d1acf4 100644 --- a/src/bin/pg_waldump/pg_waldump.c +++ b/src/bin/pg_waldump/pg_waldump.c @@ -281,6 +281,46 @@ identify_target_directory(char *directory, char *fname) return NULL; /* not reached */ } +static void +WALDumpOpenSegment(XLogSegNo nextSegNo, WALSegmentContext *segcxt, + TimeLineID *tli_p, int *file_p) +{ + TimeLineID tli = *tli_p; + char fname[MAXPGPATH]; + int tries; + + XLogFileName(fname, tli, nextSegNo, segcxt->ws_segsize); + + /* + * In follow mode there is a short period of time after the server has + * written the end of the previous file before the new file is available. + * So we loop for 5 seconds looking for the file to appear before giving + * up. + */ + for (tries = 0; tries < 10; tries++) + { + *file_p = open_file_in_directory(segcxt->ws_dir, fname); + if (*file_p >= 0) + break; + if (errno == ENOENT) + { + int save_errno = errno; + + /* File not there yet, try again */ + pg_usleep(500 * 1000); + + errno = save_errno; + continue; + } + /* Any other error, fall through and fail */ + break; + } + + if (*file_p < 0) + fatal_error("could not find file \"%s\": %s", + fname, strerror(errno)); +} + /* * Read count bytes from a segment file in the specified directory, for the * given timeline, containing the specified record pointer; store the data in @@ -412,6 +452,7 @@ XLogDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, { XLogDumpPrivate *private = state->private_data; int count = XLOG_BLCKSZ; + XLogReadError errinfo; if (private->endptr != InvalidXLogRecPtr) { @@ -426,8 +467,23 @@ XLogDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, } } - XLogDumpXLogRead(state->segcxt.ws_dir, private->timeline, targetPagePtr, - readBuff, count); + if (!XLogRead(readBuff, targetPagePtr, count, private->timeline, + &state->seg, &state->segcxt, WALDumpOpenSegment, &errinfo)) + { + WALOpenSegment *seg = errinfo.seg; + char fname[MAXPGPATH]; + + XLogFileName(fname, seg->ws_tli, seg->ws_segno, + state->segcxt.ws_segsize); + + if (errno != 0) + fatal_error("could not read from log file %s, offset %u, length %zu: %s", + fname, seg->ws_off, (Size) errinfo.reqbytes, + strerror(errinfo.read_errno)); + else + fatal_error("could not read from log file %s, offset %u: length: %zu", + fname, seg->ws_off, (Size) errinfo.reqbytes); + } return count; } diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index 1bbee386e8..f066b6255d 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -218,6 +218,26 @@ extern XLogReaderState *XLogReaderAllocate(int wal_segment_size, extern void XLogReaderFree(XLogReaderState *state); /* Initialize supporting structures */ +/* + * Callback to open the specified WAL segment for reading. + * + * "nextSegNo" is the number of the segment to be opened. + * + * "segcxt" is additional information about the segment. + * + * "tli_p" is an input/output argument. XLogRead() uses it to pass the + * timeline in which the new segment should be found, but the callback can use + * it to return the TLI that it actually opened. + * + * "file_p" points to an address the segment file descriptor should be stored + * at. + * + * BasicOpenFile() is the preferred way to open the segment file in backend + * code, whereas open(2) should be used in frontend. + */ +typedef void (*WALSegmentOpen) (XLogSegNo nextSegNo, WALSegmentContext *segcxt, + TimeLineID *tli_p, int *file_p); + extern void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt, int segsize, const char *waldir); @@ -232,6 +252,28 @@ extern bool XLogReaderValidatePageHeader(XLogReaderState *state, #ifdef FRONTEND extern XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr); #endif /* FRONTEND */ +/* + * Error information that both backend and frontend caller can process. + * + * XXX Should the name be WALReadError? If so, we probably need to rename + * XLogRead() and XLogReadProcessError() too. + */ +typedef struct XLogReadError +{ + int read_errno; /* errno set by the last read(). */ + int readbytes; /* Bytes read by the last read(). */ + int reqbytes; /* Bytes requested to be read. */ + WALOpenSegment *seg; /* Segment we tried to read from. */ +} XLogReadError; + +extern bool XLogRead(char *buf, XLogRecPtr startptr, Size count, + TimeLineID tli, WALOpenSegment *seg, + WALSegmentContext *segcxt, WALSegmentOpen openSegment, + XLogReadError *errinfo); +#ifndef FRONTEND +void XLogReadProcessError(XLogReadError *errinfo); +#endif + /* Functions for decoding an XLogRecord */ extern bool DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, -- 2.20.1
>From c7d4d390dc12978dd947dfe97691fbfbe1209e12 Mon Sep 17 00:00:00 2001 From: Antonin Houska <a...@cybertec.at> Date: Fri, 4 Oct 2019 12:07:22 +0200 Subject: [PATCH 2/2] Remove the old implemenations of XLogRead(). Done in a separate patch because the diff looks harder to read if one function (XLogRead) is removed and another one (the WALSegmentOpen callback) is added nearby at the same time (the addition and removal of code can get mixed in the diff). --- src/backend/access/transam/xlogutils.c | 122 ---------------- src/backend/replication/walsender.c | 188 ------------------------- src/bin/pg_waldump/pg_waldump.c | 122 ---------------- 3 files changed, 432 deletions(-) diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index 007974ea99..0211a68640 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -639,128 +639,6 @@ XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum, forget_invalid_pages(rnode, forkNum, nblocks); } -/* - * Read 'count' bytes from WAL into 'buf', starting at location 'startptr' - * in timeline 'tli'. - * - * Will open, and keep open, one WAL segment stored in the static file - * descriptor 'sendFile'. This means if XLogRead is used once, there will - * always be one descriptor left open until the process ends, but never - * more than one. - * - * XXX This is very similar to pg_waldump's XLogDumpXLogRead and to XLogRead - * in walsender.c but for small differences (such as lack of elog() in - * frontend). Probably these should be merged at some point. - */ -static void -XLogReadOld(char *buf, int segsize, TimeLineID tli, XLogRecPtr startptr, - Size count) -{ - char *p; - XLogRecPtr recptr; - Size nbytes; - - /* state maintained across calls */ - static int sendFile = -1; - static XLogSegNo sendSegNo = 0; - static TimeLineID sendTLI = 0; - static uint32 sendOff = 0; - - Assert(segsize == wal_segment_size); - - p = buf; - recptr = startptr; - nbytes = count; - - while (nbytes > 0) - { - uint32 startoff; - int segbytes; - int readbytes; - - startoff = XLogSegmentOffset(recptr, segsize); - - /* Do we need to switch to a different xlog segment? */ - if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo, segsize) || - sendTLI != tli) - { - char path[MAXPGPATH]; - - if (sendFile >= 0) - close(sendFile); - - XLByteToSeg(recptr, sendSegNo, segsize); - - XLogFilePath(path, tli, sendSegNo, segsize); - - sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY); - - if (sendFile < 0) - { - if (errno == ENOENT) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("requested WAL segment %s has already been removed", - path))); - else - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not open file \"%s\": %m", - path))); - } - sendOff = 0; - sendTLI = tli; - } - - /* Need to seek in the file? */ - if (sendOff != startoff) - { - if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0) - { - char path[MAXPGPATH]; - int save_errno = errno; - - XLogFilePath(path, tli, sendSegNo, segsize); - errno = save_errno; - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not seek in log segment %s to offset %u: %m", - path, startoff))); - } - sendOff = startoff; - } - - /* How many bytes are within this segment? */ - if (nbytes > (segsize - startoff)) - segbytes = segsize - startoff; - else - segbytes = nbytes; - - pgstat_report_wait_start(WAIT_EVENT_WAL_READ); - readbytes = read(sendFile, p, segbytes); - pgstat_report_wait_end(); - if (readbytes <= 0) - { - char path[MAXPGPATH]; - int save_errno = errno; - - XLogFilePath(path, tli, sendSegNo, segsize); - errno = save_errno; - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not read from log segment %s, offset %u, length %lu: %m", - path, sendOff, (unsigned long) segbytes))); - } - - /* Update state for read */ - recptr += readbytes; - - sendOff += readbytes; - nbytes -= readbytes; - p += readbytes; - } -} - /* * Determine which timeline to read an xlog page from and set the * XLogReaderState's currTLI to that timeline ID. diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index b21b143f99..76a5477389 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -252,9 +252,6 @@ static void WalSndSegmentOpen(XLogSegNo nextSegNo, WALSegmentContext *segcxt, TimeLineID *tli_p, int *file_p); -static void XLogReadOld(WALSegmentContext *segcxt, char *buf, - XLogRecPtr startptr, Size count); - /* Initialize walsender process before entering the main command loop */ void InitWalSender(void) @@ -2377,191 +2374,6 @@ WalSndKill(int code, Datum arg) SpinLockRelease(&walsnd->mutex); } -/* - * Read 'count' bytes from WAL into 'buf', starting at location 'startptr' - * - * XXX probably this should be improved to suck data directly from the - * WAL buffers when possible. - * - * Will open, and keep open, one WAL segment stored in the global file - * descriptor sendFile. This means if XLogRead is used once, there will - * always be one descriptor left open until the process ends, but never - * more than one. - */ -static void -XLogReadOld(WALSegmentContext *segcxt, char *buf, XLogRecPtr startptr, Size count) -{ - char *p; - XLogRecPtr recptr; - Size nbytes; - XLogSegNo segno; - -retry: - p = buf; - recptr = startptr; - nbytes = count; - - while (nbytes > 0) - { - uint32 startoff; - int segbytes; - int readbytes; - - startoff = XLogSegmentOffset(recptr, segcxt->ws_segsize); - - if (sendSeg->ws_file < 0 || - !XLByteInSeg(recptr, sendSeg->ws_segno, segcxt->ws_segsize)) - { - char path[MAXPGPATH]; - - /* Switch to another logfile segment */ - if (sendSeg->ws_file >= 0) - close(sendSeg->ws_file); - - XLByteToSeg(recptr, sendSeg->ws_segno, segcxt->ws_segsize); - - /*------- - * When reading from a historic timeline, and there is a timeline - * switch within this segment, read from the WAL segment belonging - * to the new timeline. - * - * For example, imagine that this server is currently on timeline - * 5, and we're streaming timeline 4. The switch from timeline 4 - * to 5 happened at 0/13002088. In pg_wal, we have these files: - * - * ... - * 000000040000000000000012 - * 000000040000000000000013 - * 000000050000000000000013 - * 000000050000000000000014 - * ... - * - * In this situation, when requested to send the WAL from - * segment 0x13, on timeline 4, we read the WAL from file - * 000000050000000000000013. Archive recovery prefers files from - * newer timelines, so if the segment was restored from the - * archive on this server, the file belonging to the old timeline, - * 000000040000000000000013, might not exist. Their contents are - * equal up to the switchpoint, because at a timeline switch, the - * used portion of the old segment is copied to the new file. - *------- - */ - sendSeg->ws_tli = sendTimeLine; - if (sendTimeLineIsHistoric) - { - XLogSegNo endSegNo; - - XLByteToSeg(sendTimeLineValidUpto, endSegNo, segcxt->ws_segsize); - if (sendSeg->ws_segno == endSegNo) - sendSeg->ws_tli = sendTimeLineNextTLI; - } - - XLogFilePath(path, sendSeg->ws_tli, sendSeg->ws_segno, segcxt->ws_segsize); - - sendSeg->ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY); - if (sendSeg->ws_file < 0) - { - /* - * If the file is not found, assume it's because the standby - * asked for a too old WAL segment that has already been - * removed or recycled. - */ - if (errno == ENOENT) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("requested WAL segment %s has already been removed", - XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno)))); - else - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not open file \"%s\": %m", - path))); - } - sendSeg->ws_off = 0; - } - - /* Need to seek in the file? */ - if (sendSeg->ws_off != startoff) - { - if (lseek(sendSeg->ws_file, (off_t) startoff, SEEK_SET) < 0) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not seek in log segment %s to offset %u: %m", - XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno), - startoff))); - sendSeg->ws_off = startoff; - } - - /* How many bytes are within this segment? */ - if (nbytes > (segcxt->ws_segsize - startoff)) - segbytes = segcxt->ws_segsize - startoff; - else - segbytes = nbytes; - - pgstat_report_wait_start(WAIT_EVENT_WAL_READ); - readbytes = read(sendSeg->ws_file, p, segbytes); - pgstat_report_wait_end(); - if (readbytes < 0) - { - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not read from log segment %s, offset %u, length %zu: %m", - XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno), - sendSeg->ws_off, (Size) segbytes))); - } - else if (readbytes == 0) - { - ereport(ERROR, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("could not read from log segment %s, offset %u: read %d of %zu", - XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno), - sendSeg->ws_off, readbytes, (Size) segbytes))); - } - - /* Update state for read */ - recptr += readbytes; - - sendSeg->ws_off += readbytes; - nbytes -= readbytes; - p += readbytes; - } - - /* - * After reading into the buffer, check that what we read was valid. We do - * this after reading, because even though the segment was present when we - * opened it, it might get recycled or removed while we read it. The - * read() succeeds in that case, but the data we tried to read might - * already have been overwritten with new WAL records. - */ - XLByteToSeg(startptr, segno, segcxt->ws_segsize); - CheckXLogRemoved(segno, ThisTimeLineID); - - /* - * During recovery, the currently-open WAL file might be replaced with the - * file of the same name retrieved from archive. So we always need to - * check what we read was valid after reading into the buffer. If it's - * invalid, we try to open and read the file again. - */ - if (am_cascading_walsender) - { - WalSnd *walsnd = MyWalSnd; - bool reload; - - SpinLockAcquire(&walsnd->mutex); - reload = walsnd->needreload; - walsnd->needreload = false; - SpinLockRelease(&walsnd->mutex); - - if (reload && sendSeg->ws_file >= 0) - { - close(sendSeg->ws_file); - sendSeg->ws_file = -1; - - goto retry; - } - } -} - /* * Callback for XLogRead() to open the next segment. */ diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c index 4c49d1acf4..39686c235c 100644 --- a/src/bin/pg_waldump/pg_waldump.c +++ b/src/bin/pg_waldump/pg_waldump.c @@ -321,128 +321,6 @@ WALDumpOpenSegment(XLogSegNo nextSegNo, WALSegmentContext *segcxt, fname, strerror(errno)); } -/* - * Read count bytes from a segment file in the specified directory, for the - * given timeline, containing the specified record pointer; store the data in - * the passed buffer. - */ -static void -XLogDumpXLogRead(const char *directory, TimeLineID timeline_id, - XLogRecPtr startptr, char *buf, Size count) -{ - char *p; - XLogRecPtr recptr; - Size nbytes; - - static int sendFile = -1; - static XLogSegNo sendSegNo = 0; - static uint32 sendOff = 0; - - p = buf; - recptr = startptr; - nbytes = count; - - while (nbytes > 0) - { - uint32 startoff; - int segbytes; - int readbytes; - - startoff = XLogSegmentOffset(recptr, WalSegSz); - - if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo, WalSegSz)) - { - char fname[MAXFNAMELEN]; - int tries; - - /* Switch to another logfile segment */ - if (sendFile >= 0) - close(sendFile); - - XLByteToSeg(recptr, sendSegNo, WalSegSz); - - XLogFileName(fname, timeline_id, sendSegNo, WalSegSz); - - /* - * In follow mode there is a short period of time after the server - * has written the end of the previous file before the new file is - * available. So we loop for 5 seconds looking for the file to - * appear before giving up. - */ - for (tries = 0; tries < 10; tries++) - { - sendFile = open_file_in_directory(directory, fname); - if (sendFile >= 0) - break; - if (errno == ENOENT) - { - int save_errno = errno; - - /* File not there yet, try again */ - pg_usleep(500 * 1000); - - errno = save_errno; - continue; - } - /* Any other error, fall through and fail */ - break; - } - - if (sendFile < 0) - fatal_error("could not find file \"%s\": %s", - fname, strerror(errno)); - sendOff = 0; - } - - /* Need to seek in the file? */ - if (sendOff != startoff) - { - if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0) - { - int err = errno; - char fname[MAXPGPATH]; - - XLogFileName(fname, timeline_id, sendSegNo, WalSegSz); - - fatal_error("could not seek in log file %s to offset %u: %s", - fname, startoff, strerror(err)); - } - sendOff = startoff; - } - - /* How many bytes are within this segment? */ - if (nbytes > (WalSegSz - startoff)) - segbytes = WalSegSz - startoff; - else - segbytes = nbytes; - - readbytes = read(sendFile, p, segbytes); - if (readbytes <= 0) - { - int err = errno; - char fname[MAXPGPATH]; - int save_errno = errno; - - XLogFileName(fname, timeline_id, sendSegNo, WalSegSz); - errno = save_errno; - - if (readbytes < 0) - fatal_error("could not read from log file %s, offset %u, length %d: %s", - fname, sendOff, segbytes, strerror(err)); - else if (readbytes == 0) - fatal_error("could not read from log file %s, offset %u: read %d of %zu", - fname, sendOff, readbytes, (Size) segbytes); - } - - /* Update state for read */ - recptr += readbytes; - - sendOff += readbytes; - nbytes -= readbytes; - p += readbytes; - } -} - /* * XLogReader read_page callback */ -- 2.20.1