On 2020-May-12, Kyotaro Horiguchi wrote: > I'm not sure the reason for wal_segment_open and WalSndSegmentOpen > being modified different way about error handling of BasicOpenFile, I > prefer the WalSndSegmentOpen way. However, that difference doesn't > harm anything so I'm fine with the current patch.
Yeah, I couldn't decide which style I liked the most. I used the one you suggested. > + fake_xlogreader.seg = *sendSeg; > + fake_xlogreader.segcxt = *sendCxt; > > fake_xlogreader.seg is a different instance from *sendSeg. WALRead > modifies fake_xlogreader.seg but does not modify *sendSeg. Thus the > change doesn't persist. On the other hand WalSndSegmentOpen reads > *sendSeg, which is not under control of WALRead. > > Maybe we had better to make fake_xlogreader be a global variable of > walsender.c that covers the current sendSeg and sendCxt. I tried that. I was about to leave it at just modifying physical walsender (simple enough, and it passed tests), but I noticed that WalSndErrorCleanup() would be a problem because we don't know if it's physical or logical walsender. So in the end I added a global 'xlogreader' pointer in walsender.c -- logical walsender sets it to the true xlogreader it has inside the logical decoding context, and physical walsender sets it to its fake xlogreader. That seems to work nicely. sendSeg/sendCxt are gone entirely. Logical walsender was doing WALOpenSegmentInit() uselessly during InitWalSender(), since it was using the separate sendSeg/sendCxt structs instead of the ones in its xlogreader. (Some mysteries become clearer!) It's slightly disquieting that the segment_close call in WalSndErrorCleanup is not covered, but in any case this should work well AFAICS. I think this is simpler to understand than formerly. Now the only silliness remaining is the fact that different users of the xlogreader interface are doing different things about the TLI. Hopefully we can unify everything to something sensible one day .. but that's not going to happen in pg13. I'll get this pushed tomorrow, unless there are further objections. -- Álvaro Herrera https://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
>From 63160d69f734bdb2ed45aa839ec2903f12194d50 Mon Sep 17 00:00:00 2001 From: Alvaro Herrera <alvhe...@alvh.no-ip.org> Date: Tue, 12 May 2020 19:39:57 -0400 Subject: [PATCH] Adjust walsender usage of xlogreader, simplify APIs Per comments from Kyotaro Horiguchi --- src/backend/access/transam/xlogreader.c | 35 +++++---- src/backend/access/transam/xlogutils.c | 16 ++-- src/backend/replication/walsender.c | 98 ++++++++++++------------- src/bin/pg_waldump/pg_waldump.c | 16 ++-- src/include/access/xlogreader.h | 20 ++--- src/include/access/xlogutils.h | 3 +- 6 files changed, 83 insertions(+), 105 deletions(-) diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 7cee8b92c9..aae3fee24c 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -1044,14 +1044,12 @@ err: /* * Helper function to ease writing of XLogRoutine->page_read callbacks. - * If this function is used, caller must supply an open_segment callback in + * If this function is used, caller must supply a segment_open callback in * 'state', as that is used here. * * Read 'count' bytes into 'buf', starting at location 'startptr', from WAL * fetched from timeline 'tli'. * - * 'seg/segcxt' identify the last segment used. - * * Returns true if succeeded, false if an error occurs, in which case * 'errinfo' receives error details. * @@ -1061,7 +1059,6 @@ err: bool WALRead(XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, - WALOpenSegment *seg, WALSegmentContext *segcxt, WALReadError *errinfo) { char *p; @@ -1078,34 +1075,36 @@ WALRead(XLogReaderState *state, int segbytes; int readbytes; - startoff = XLogSegmentOffset(recptr, segcxt->ws_segsize); + startoff = XLogSegmentOffset(recptr, state->segcxt.ws_segsize); /* * If the data we want is not in a segment we have open, close what we * have (if anything) and open the next one, using the caller's * provided openSegment callback. */ - if (seg->ws_file < 0 || - !XLByteInSeg(recptr, seg->ws_segno, segcxt->ws_segsize) || - tli != seg->ws_tli) + if (state->seg.ws_file < 0 || + !XLByteInSeg(recptr, state->seg.ws_segno, state->segcxt.ws_segsize) || + tli != state->seg.ws_tli) { XLogSegNo nextSegNo; - if (seg->ws_file >= 0) + if (state->seg.ws_file >= 0) state->routine.segment_close(state); - XLByteToSeg(recptr, nextSegNo, segcxt->ws_segsize); - seg->ws_file = state->routine.segment_open(state, nextSegNo, - segcxt, &tli); + XLByteToSeg(recptr, nextSegNo, state->segcxt.ws_segsize); + state->routine.segment_open(state, nextSegNo, &tli); + + /* This shouldn't happen -- indicates a bug in segment_open */ + Assert(state->seg.ws_file >= 0); /* Update the current segment info. */ - seg->ws_tli = tli; - seg->ws_segno = nextSegNo; + state->seg.ws_tli = tli; + state->seg.ws_segno = nextSegNo; } /* How many bytes are within this segment? */ - if (nbytes > (segcxt->ws_segsize - startoff)) - segbytes = segcxt->ws_segsize - startoff; + if (nbytes > (state->segcxt.ws_segsize - startoff)) + segbytes = state->segcxt.ws_segsize - startoff; else segbytes = nbytes; @@ -1115,7 +1114,7 @@ WALRead(XLogReaderState *state, /* Reset errno first; eases reporting non-errno-affecting errors */ errno = 0; - readbytes = pg_pread(seg->ws_file, p, segbytes, (off_t) startoff); + readbytes = pg_pread(state->seg.ws_file, p, segbytes, (off_t) startoff); #ifndef FRONTEND pgstat_report_wait_end(); @@ -1127,7 +1126,7 @@ WALRead(XLogReaderState *state, errinfo->wre_req = segbytes; errinfo->wre_read = readbytes; errinfo->wre_off = startoff; - errinfo->wre_seg = *seg; + errinfo->wre_seg = state->seg; return false; } diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index 0bb69447c2..322b0e8ff5 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -784,18 +784,17 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa } /* XLogReaderRoutine->segment_open callback for local pg_wal files */ -int +void wal_segment_open(XLogReaderState *state, XLogSegNo nextSegNo, - WALSegmentContext *segcxt, TimeLineID *tli_p) + TimeLineID *tli_p) { TimeLineID tli = *tli_p; char path[MAXPGPATH]; - int fd; - XLogFilePath(path, tli, nextSegNo, segcxt->ws_segsize); - fd = BasicOpenFile(path, O_RDONLY | PG_BINARY); - if (fd >= 0) - return fd; + XLogFilePath(path, tli, nextSegNo, state->segcxt.ws_segsize); + state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY); + if (state->seg.ws_file >= 0) + return; if (errno == ENOENT) ereport(ERROR, @@ -807,8 +806,6 @@ wal_segment_open(XLogReaderState *state, XLogSegNo nextSegNo, (errcode_for_file_access(), errmsg("could not open file \"%s\": %m", path))); - - return -1; /* keep compiler quiet */ } /* stock XLogReaderRoutine->segment_close callback */ @@ -947,7 +944,6 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, * zero-padded up to the page boundary if it's incomplete. */ if (!WALRead(state, cur_page, targetPagePtr, XLOG_BLCKSZ, tli, - &state->seg, &state->segcxt, &errinfo)) WALReadRaiseError(&errinfo); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 9f14b99231..3367aa98f8 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -129,8 +129,14 @@ bool log_replication_commands = false; */ bool wake_wal_senders = false; -static WALOpenSegment *sendSeg = NULL; -static WALSegmentContext *sendCxt = NULL; +/* + * Physical walsender does not use xlogreader to read WAL, but it does use a + * fake one to keep state. Logical walsender uses a proper xlogreader. Both + * keep the 'xlogreader' pointer to the right one, for the sake of common + * routines. + */ +static XLogReaderState fake_xlogreader; +static XLogReaderState *xlogreader; /* * These variables keep track of the state of the timeline we're currently @@ -248,8 +254,8 @@ static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time); static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now); static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch); -static int WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, - WALSegmentContext *segcxt, TimeLineID *tli_p); +static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, + TimeLineID *tli_p); static void UpdateSpillStats(LogicalDecodingContext *ctx); @@ -280,12 +286,19 @@ InitWalSender(void) /* Initialize empty timestamp buffer for lag tracking. */ lag_tracker = MemoryContextAllocZero(TopMemoryContext, sizeof(LagTracker)); - /* Make sure we can remember the current read position in XLOG. */ - sendSeg = (WALOpenSegment *) - MemoryContextAlloc(TopMemoryContext, sizeof(WALOpenSegment)); - sendCxt = (WALSegmentContext *) - MemoryContextAlloc(TopMemoryContext, sizeof(WALSegmentContext)); - WALOpenSegmentInit(sendSeg, sendCxt, wal_segment_size, NULL); + /* + * Prepare physical walsender's fake xlogreader struct. Logical walsender + * does this later. + */ + if (!am_db_walsender) + { + xlogreader = &fake_xlogreader; + xlogreader->routine = + *XL_ROUTINE(.segment_open = WalSndSegmentOpen, + .segment_close = wal_segment_close); + WALOpenSegmentInit(&xlogreader->seg, &xlogreader->segcxt, + wal_segment_size, NULL); + } } /* @@ -302,11 +315,8 @@ WalSndErrorCleanup(void) ConditionVariableCancelSleep(); pgstat_report_wait_end(); - if (sendSeg->ws_file >= 0) - { - close(sendSeg->ws_file); - sendSeg->ws_file = -1; - } + if (xlogreader->seg.ws_file >= 0) + wal_segment_close(xlogreader); if (MyReplicationSlot != NULL) ReplicationSlotRelease(); @@ -837,11 +847,9 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req cur_page, targetPagePtr, XLOG_BLCKSZ, - sendSeg->ws_tli, /* Pass the current TLI because only + state->seg.ws_tli, /* Pass the current TLI because only * WalSndSegmentOpen controls whether new * TLI is needed. */ - sendSeg, - sendCxt, &errinfo)) WALReadRaiseError(&errinfo); @@ -852,8 +860,8 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req * read() succeeds in that case, but the data we tried to read might * already have been overwritten with new WAL records. */ - XLByteToSeg(targetPagePtr, segno, sendCxt->ws_segsize); - CheckXLogRemoved(segno, sendSeg->ws_tli); + XLByteToSeg(targetPagePtr, segno, state->segcxt.ws_segsize); + CheckXLogRemoved(segno, state->seg.ws_tli); return count; } @@ -1176,6 +1184,7 @@ StartLogicalReplication(StartReplicationCmd *cmd) .segment_close = wal_segment_close), WalSndPrepareWrite, WalSndWriteData, WalSndUpdateProgress); + xlogreader = logical_decoding_ctx->reader; WalSndSetState(WALSNDSTATE_CATCHUP); @@ -2447,13 +2456,11 @@ WalSndKill(int code, Datum arg) } /* XLogReaderRoutine->segment_open callback */ -static int -WalSndSegmentOpen(XLogReaderState *state, - XLogSegNo nextSegNo, WALSegmentContext *segcxt, +static void +WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p) { char path[MAXPGPATH]; - int fd; /*------- * When reading from a historic timeline, and there is a timeline switch @@ -2484,15 +2491,15 @@ WalSndSegmentOpen(XLogReaderState *state, { XLogSegNo endSegNo; - XLByteToSeg(sendTimeLineValidUpto, endSegNo, segcxt->ws_segsize); - if (sendSeg->ws_segno == endSegNo) + XLByteToSeg(sendTimeLineValidUpto, endSegNo, state->segcxt.ws_segsize); + if (state->seg.ws_segno == endSegNo) *tli_p = sendTimeLineNextTLI; } - XLogFilePath(path, *tli_p, nextSegNo, segcxt->ws_segsize); - fd = BasicOpenFile(path, O_RDONLY | PG_BINARY); - if (fd >= 0) - return fd; + XLogFilePath(path, *tli_p, nextSegNo, state->segcxt.ws_segsize); + state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY); + if (state->seg.ws_file >= 0) + return; /* * If the file is not found, assume it's because the standby asked for a @@ -2515,7 +2522,6 @@ WalSndSegmentOpen(XLogReaderState *state, (errcode_for_file_access(), errmsg("could not open file \"%s\": %m", path))); - return -1; /* keep compiler quiet */ } /* @@ -2537,12 +2543,6 @@ XLogSendPhysical(void) Size nbytes; XLogSegNo segno; WALReadError errinfo; - static XLogReaderState fake_xlogreader = - { - /* Fake xlogreader state for WALRead */ - .routine.segment_open = WalSndSegmentOpen, - .routine.segment_close = wal_segment_close - }; /* If requested switch the WAL sender to the stopping state. */ if (got_STOPPING) @@ -2685,9 +2685,8 @@ XLogSendPhysical(void) if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr) { /* close the current file. */ - if (sendSeg->ws_file >= 0) - close(sendSeg->ws_file); - sendSeg->ws_file = -1; + if (xlogreader->seg.ws_file >= 0) + wal_segment_close(xlogreader); /* Send CopyDone */ pq_putmessage_noblock('c', NULL, 0); @@ -2760,21 +2759,19 @@ XLogSendPhysical(void) enlargeStringInfo(&output_message, nbytes); retry: - if (!WALRead(&fake_xlogreader, + if (!WALRead(xlogreader, &output_message.data[output_message.len], startptr, nbytes, - sendSeg->ws_tli, /* Pass the current TLI because only - * WalSndSegmentOpen controls whether new - * TLI is needed. */ - sendSeg, - sendCxt, + xlogreader->seg.ws_tli, /* Pass the current TLI because + * only WalSndSegmentOpen controls + * whether new TLI is needed. */ &errinfo)) WALReadRaiseError(&errinfo); /* See logical_read_xlog_page(). */ - XLByteToSeg(startptr, segno, sendCxt->ws_segsize); - CheckXLogRemoved(segno, sendSeg->ws_tli); + XLByteToSeg(startptr, segno, xlogreader->segcxt.ws_segsize); + CheckXLogRemoved(segno, xlogreader->seg.ws_tli); /* * During recovery, the currently-open WAL file might be replaced with the @@ -2792,10 +2789,9 @@ retry: walsnd->needreload = false; SpinLockRelease(&walsnd->mutex); - if (reload && sendSeg->ws_file >= 0) + if (reload && xlogreader->seg.ws_file >= 0) { - close(sendSeg->ws_file); - sendSeg->ws_file = -1; + wal_segment_close(xlogreader); goto retry; } diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c index e29f65500f..d1a0678935 100644 --- a/src/bin/pg_waldump/pg_waldump.c +++ b/src/bin/pg_waldump/pg_waldump.c @@ -280,17 +280,15 @@ identify_target_directory(char *directory, char *fname) } /* pg_waldump's XLogReaderRoutine->segment_open callback */ -static int -WALDumpOpenSegment(XLogReaderState *state, - XLogSegNo nextSegNo, WALSegmentContext *segcxt, +static void +WALDumpOpenSegment(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p) { TimeLineID tli = *tli_p; char fname[MAXPGPATH]; - int fd; int tries; - XLogFileName(fname, tli, nextSegNo, segcxt->ws_segsize); + XLogFileName(fname, tli, nextSegNo, state->segcxt.ws_segsize); /* * In follow mode there is a short period of time after the server has @@ -300,9 +298,9 @@ WALDumpOpenSegment(XLogReaderState *state, */ for (tries = 0; tries < 10; tries++) { - fd = open_file_in_directory(segcxt->ws_dir, fname); - if (fd >= 0) - return fd; + state->seg.ws_file = open_file_in_directory(state->segcxt.ws_dir, fname); + if (state->seg.ws_file >= 0) + return; if (errno == ENOENT) { int save_errno = errno; @@ -318,7 +316,6 @@ WALDumpOpenSegment(XLogReaderState *state, } fatal_error("could not find file \"%s\": %m", fname); - return -1; /* keep compiler quiet */ } /* @@ -356,7 +353,6 @@ WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, } if (!WALRead(state, readBuff, targetPagePtr, count, private->timeline, - &state->seg, &state->segcxt, &errinfo)) { WALOpenSegment *seg = &errinfo.wre_seg; diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index 80cf62acb7..c21b0ba972 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -63,10 +63,9 @@ typedef int (*XLogPageReadCB) (XLogReaderState *xlogreader, int reqLen, XLogRecPtr targetRecPtr, char *readBuf); -typedef int (*WALSegmentOpenCB) (XLogReaderState *xlogreader, - XLogSegNo nextSegNo, - WALSegmentContext *segcxt, - TimeLineID *tli_p); +typedef void (*WALSegmentOpenCB) (XLogReaderState *xlogreader, + XLogSegNo nextSegNo, + TimeLineID *tli_p); typedef void (*WALSegmentCloseCB) (XLogReaderState *xlogreader); typedef struct XLogReaderRoutine @@ -94,21 +93,16 @@ typedef struct XLogReaderRoutine XLogPageReadCB page_read; /* - * Callback to open the specified WAL segment for reading. The file - * descriptor of the opened segment shall be returned. In case of + * Callback to open the specified WAL segment for reading. ->seg.ws_file + * shall be set to the file descriptor of the opened segment. In case of * failure, an error shall be raised by the callback and it shall not * return. * * "nextSegNo" is the number of the segment to be opened. * - * "segcxt" is additional information about the segment. - * * "tli_p" is an input/output argument. WALRead() uses it to pass the * timeline in which the new segment should be found, but the callback can * use it to return the TLI that it actually opened. - * - * BasicOpenFile() is the preferred way to open the segment file in - * backend code, whereas open(2) should be used in frontend. */ WALSegmentOpenCB segment_open; @@ -301,9 +295,7 @@ typedef struct WALReadError extern bool WALRead(XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count, - TimeLineID tli, WALOpenSegment *seg, - WALSegmentContext *segcxt, - WALReadError *errinfo); + TimeLineID tli, WALReadError *errinfo); /* Functions for decoding an XLogRecord */ diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h index 68ce815476..e59b6cf3a9 100644 --- a/src/include/access/xlogutils.h +++ b/src/include/access/xlogutils.h @@ -50,9 +50,8 @@ extern void FreeFakeRelcacheEntry(Relation fakerel); extern int read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page); -extern int wal_segment_open(XLogReaderState *state, +extern void wal_segment_open(XLogReaderState *state, XLogSegNo nextSegNo, - WALSegmentContext *segcxt, TimeLineID *tli_p); extern void wal_segment_close(XLogReaderState *state); -- 2.20.1