On 2020-May-08, Kyotaro Horiguchi wrote: > I agree to the direction of this patch. Thanks for the explanation. > The patch looks good to me except the two points below.
Thanks! I pushed the patch. I fixed the walsender commentary as you suggested, but I'm still of the opinion that we might want to use the XLogReader abstraction in physical walsender than work without it; if nothing else, that would simplify WALRead's API. I didn't change this one though: > wal_segment_close(XlogReaderState *state) is setting > state->seg.ws_file to -1. On the other hand wal_segment_close(state,..) > doesn't update ws_file and the caller sets the returned value to > (eventually) the same field. > > + seg->ws_file = state->routine.segment_open(state, > nextSegNo, > + > segcxt, &tli); > > If you are willing to do so, I think it is better to make the callback > functions are responsible to update the seg.ws_file and the callers > don't care. I agree that this would be a good idea, but it's more than just a handful of lines of changes so I think we should consider it separately. Attached as 0002. I also realized while doing this that we can further simplify WALRead()'s API if we're willing to bend walsender a little bit more into the fake xlogreader thing; that's 0001. I marked the open item closed nonetheless. -- Álvaro Herrera https://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
>From 600e28124ffb2741482bb096d5615b521ae83850 Mon Sep 17 00:00:00 2001 From: Alvaro Herrera <alvhe...@alvh.no-ip.org> Date: Fri, 8 May 2020 16:40:24 -0400 Subject: [PATCH 1/2] fix WALRead API to take seg/segcxt from XLogReaderState --- src/backend/access/transam/xlogreader.c | 31 +++++++++++-------------- src/backend/access/transam/xlogutils.c | 1 - src/backend/replication/walsender.c | 6 ++--- src/bin/pg_waldump/pg_waldump.c | 1 - src/include/access/xlogreader.h | 4 +--- 5 files changed, 17 insertions(+), 26 deletions(-) diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 7cee8b92c9..f42dee2640 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -1050,8 +1050,6 @@ err: * Read 'count' bytes into 'buf', starting at location 'startptr', from WAL * fetched from timeline 'tli'. * - * 'seg/segcxt' identify the last segment used. - * * Returns true if succeeded, false if an error occurs, in which case * 'errinfo' receives error details. * @@ -1061,7 +1059,6 @@ err: bool WALRead(XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, - WALOpenSegment *seg, WALSegmentContext *segcxt, WALReadError *errinfo) { char *p; @@ -1078,34 +1075,34 @@ WALRead(XLogReaderState *state, int segbytes; int readbytes; - startoff = XLogSegmentOffset(recptr, segcxt->ws_segsize); + startoff = XLogSegmentOffset(recptr, state->segcxt.ws_segsize); /* * If the data we want is not in a segment we have open, close what we * have (if anything) and open the next one, using the caller's * provided openSegment callback. */ - if (seg->ws_file < 0 || - !XLByteInSeg(recptr, seg->ws_segno, segcxt->ws_segsize) || - tli != seg->ws_tli) + if (state->seg.ws_file < 0 || + !XLByteInSeg(recptr, state->seg.ws_segno, state->segcxt.ws_segsize) || + tli != state->seg.ws_tli) { XLogSegNo nextSegNo; - if (seg->ws_file >= 0) + if (state->seg.ws_file >= 0) state->routine.segment_close(state); - XLByteToSeg(recptr, nextSegNo, segcxt->ws_segsize); - seg->ws_file = state->routine.segment_open(state, nextSegNo, - segcxt, &tli); + XLByteToSeg(recptr, nextSegNo, state->segcxt.ws_segsize); + state->seg.ws_file = state->routine.segment_open(state, nextSegNo, + &state->segcxt, &tli); /* Update the current segment info. */ - seg->ws_tli = tli; - seg->ws_segno = nextSegNo; + state->seg.ws_tli = tli; + state->seg.ws_segno = nextSegNo; } /* How many bytes are within this segment? */ - if (nbytes > (segcxt->ws_segsize - startoff)) - segbytes = segcxt->ws_segsize - startoff; + if (nbytes > (state->segcxt.ws_segsize - startoff)) + segbytes = state->segcxt.ws_segsize - startoff; else segbytes = nbytes; @@ -1115,7 +1112,7 @@ WALRead(XLogReaderState *state, /* Reset errno first; eases reporting non-errno-affecting errors */ errno = 0; - readbytes = pg_pread(seg->ws_file, p, segbytes, (off_t) startoff); + readbytes = pg_pread(state->seg.ws_file, p, segbytes, (off_t) startoff); #ifndef FRONTEND pgstat_report_wait_end(); @@ -1127,7 +1124,7 @@ WALRead(XLogReaderState *state, errinfo->wre_req = segbytes; errinfo->wre_read = readbytes; errinfo->wre_off = startoff; - errinfo->wre_seg = *seg; + errinfo->wre_seg = state->seg; return false; } diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index bbd801513a..fc0bb7d059 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -947,7 +947,6 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, * zero-padded up to the page boundary if it's incomplete. */ if (!WALRead(state, cur_page, targetPagePtr, XLOG_BLCKSZ, tli, - &state->seg, &state->segcxt, &errinfo)) WALReadRaiseError(&errinfo); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index d18475b854..ed8c08cb6a 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -840,8 +840,6 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req sendSeg->ws_tli, /* Pass the current TLI because only * WalSndSegmentOpen controls whether new * TLI is needed. */ - sendSeg, - sendCxt, &errinfo)) WALReadRaiseError(&errinfo); @@ -2760,6 +2758,8 @@ XLogSendPhysical(void) enlargeStringInfo(&output_message, nbytes); retry: + fake_xlogreader.seg = *sendSeg; + fake_xlogreader.segcxt = *sendCxt; if (!WALRead(&fake_xlogreader, &output_message.data[output_message.len], startptr, @@ -2767,8 +2767,6 @@ retry: sendSeg->ws_tli, /* Pass the current TLI because only * WalSndSegmentOpen controls whether new * TLI is needed. */ - sendSeg, - sendCxt, &errinfo)) WALReadRaiseError(&errinfo); diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c index e29f65500f..46734914b7 100644 --- a/src/bin/pg_waldump/pg_waldump.c +++ b/src/bin/pg_waldump/pg_waldump.c @@ -356,7 +356,6 @@ WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, } if (!WALRead(state, readBuff, targetPagePtr, count, private->timeline, - &state->seg, &state->segcxt, &errinfo)) { WALOpenSegment *seg = &errinfo.wre_seg; diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index 81af200f5e..e77f478d68 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -301,9 +301,7 @@ typedef struct WALReadError extern bool WALRead(XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count, - TimeLineID tli, WALOpenSegment *seg, - WALSegmentContext *segcxt, - WALReadError *errinfo); + TimeLineID tli, WALReadError *errinfo); /* Functions for decoding an XLogRecord */ -- 2.20.1
>From 721cadab3f1ab88c7b2f12ed2ede5dd3c9455856 Mon Sep 17 00:00:00 2001 From: Alvaro Herrera <alvhe...@alvh.no-ip.org> Date: Fri, 8 May 2020 17:03:09 -0400 Subject: [PATCH 2/2] Simplify XLogReader's open_segment API Instead of returning the file descriptor, install it directly in XLogReaderState->seg.ws_file. --- src/backend/access/transam/xlogreader.c | 4 ++-- src/backend/access/transam/xlogutils.c | 32 ++++++++++++------------- src/backend/replication/walsender.c | 12 ++++------ src/bin/pg_waldump/pg_waldump.c | 9 ++++--- src/include/access/xlogreader.h | 12 +++++----- src/include/access/xlogutils.h | 2 +- 6 files changed, 33 insertions(+), 38 deletions(-) diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index f42dee2640..a533241370 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -1092,8 +1092,8 @@ WALRead(XLogReaderState *state, state->routine.segment_close(state); XLByteToSeg(recptr, nextSegNo, state->segcxt.ws_segsize); - state->seg.ws_file = state->routine.segment_open(state, nextSegNo, - &state->segcxt, &tli); + state->routine.segment_open(state, nextSegNo, &state->segcxt, &tli); + Assert(state->seg.ws_file >= 0); /* shouldn't happen */ /* Update the current segment info. */ state->seg.ws_tli = tli; diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index fc0bb7d059..1cc2c624a4 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -784,7 +784,7 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa } /* XLogReaderRoutine->segment_open callback for local pg_wal files */ -int +void wal_segment_open(XLogReaderState *state, XLogSegNo nextSegNo, WALSegmentContext *segcxt, TimeLineID *tli_p) { @@ -793,22 +793,20 @@ wal_segment_open(XLogReaderState *state, XLogSegNo nextSegNo, int fd; XLogFilePath(path, tli, nextSegNo, segcxt->ws_segsize); - fd = BasicOpenFile(path, O_RDONLY | PG_BINARY); - if (fd >= 0) - return fd; - - if (errno == ENOENT) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("requested WAL segment %s has already been removed", - path))); - else - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not open file \"%s\": %m", - path))); - - return -1; /* keep compiler quiet */ + state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY); + if (state->seg.ws_file < 0) + { + if (errno == ENOENT) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("requested WAL segment %s has already been removed", + path))); + else + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", + path))); + } } /* stock XLogReaderRoutine->segment_close callback */ diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index ed8c08cb6a..b9f029d44f 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -248,7 +248,7 @@ static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time); static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now); static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch); -static int WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, +static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, WALSegmentContext *segcxt, TimeLineID *tli_p); static void UpdateSpillStats(LogicalDecodingContext *ctx); @@ -2445,13 +2445,12 @@ WalSndKill(int code, Datum arg) } /* XLogReaderRoutine->segment_open callback */ -static int +static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, WALSegmentContext *segcxt, TimeLineID *tli_p) { char path[MAXPGPATH]; - int fd; /*------- * When reading from a historic timeline, and there is a timeline switch @@ -2488,9 +2487,9 @@ WalSndSegmentOpen(XLogReaderState *state, } XLogFilePath(path, *tli_p, nextSegNo, segcxt->ws_segsize); - fd = BasicOpenFile(path, O_RDONLY | PG_BINARY); - if (fd >= 0) - return fd; + state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY); + if (state->seg.ws_file >= 0) + return; /* * If the file is not found, assume it's because the standby asked for a @@ -2513,7 +2512,6 @@ WalSndSegmentOpen(XLogReaderState *state, (errcode_for_file_access(), errmsg("could not open file \"%s\": %m", path))); - return -1; /* keep compiler quiet */ } /* diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c index 46734914b7..1a5c5a157c 100644 --- a/src/bin/pg_waldump/pg_waldump.c +++ b/src/bin/pg_waldump/pg_waldump.c @@ -280,7 +280,7 @@ identify_target_directory(char *directory, char *fname) } /* pg_waldump's XLogReaderRoutine->segment_open callback */ -static int +static void WALDumpOpenSegment(XLogReaderState *state, XLogSegNo nextSegNo, WALSegmentContext *segcxt, TimeLineID *tli_p) @@ -300,9 +300,9 @@ WALDumpOpenSegment(XLogReaderState *state, */ for (tries = 0; tries < 10; tries++) { - fd = open_file_in_directory(segcxt->ws_dir, fname); - if (fd >= 0) - return fd; + state->seg.ws_file = open_file_in_directory(segcxt->ws_dir, fname); + if (state->seg.ws_file >= 0) + return; if (errno == ENOENT) { int save_errno = errno; @@ -318,7 +318,6 @@ WALDumpOpenSegment(XLogReaderState *state, } fatal_error("could not find file \"%s\": %m", fname); - return -1; /* keep compiler quiet */ } /* diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index e77f478d68..b73df02218 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -63,10 +63,10 @@ typedef int (*XLogPageReadCB) (XLogReaderState *xlogreader, int reqLen, XLogRecPtr targetRecPtr, char *readBuf); -typedef int (*WALSegmentOpenCB) (XLogReaderState *xlogreader, - XLogSegNo nextSegNo, - WALSegmentContext *segcxt, - TimeLineID *tli_p); +typedef void (*WALSegmentOpenCB) (XLogReaderState *xlogreader, + XLogSegNo nextSegNo, + WALSegmentContext *segcxt, + TimeLineID *tli_p); typedef void (*WALSegmentCloseCB) (XLogReaderState *xlogreader); typedef struct XLogReaderRoutine @@ -94,8 +94,8 @@ typedef struct XLogReaderRoutine XLogPageReadCB page_read; /* - * Callback to open the specified WAL segment for reading. The file - * descriptor of the opened segment shall be returned. In case of + * Callback to open the specified WAL segment for reading. ->seg.ws_file + * shall be set to the file descriptor of the opened segment. In case of * failure, an error shall be raised by the callback and it shall not * return. * diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h index 68ce815476..b7bdc5db34 100644 --- a/src/include/access/xlogutils.h +++ b/src/include/access/xlogutils.h @@ -50,7 +50,7 @@ extern void FreeFakeRelcacheEntry(Relation fakerel); extern int read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page); -extern int wal_segment_open(XLogReaderState *state, +extern void wal_segment_open(XLogReaderState *state, XLogSegNo nextSegNo, WALSegmentContext *segcxt, TimeLineID *tli_p); -- 2.20.1