On 2020-Apr-27, Kyotaro Horiguchi wrote: > At Fri, 24 Apr 2020 11:48:46 -0400, Alvaro Herrera <alvhe...@2ndquadrant.com> > wrote in
> Sorry for the ambiguity, I didn't meant I minded that this conflicts > with my patch or I don't want this to be committed. It is easily > rebased on this patch. What I was anxious about is that the new > callback struct might be too flexible than required. So I "mildly" > objected, and I won't be dissapointed if this patch is committed. ... well, yeah, maybe it is too flexible. And perhaps we could further tweak this interface so that the file descriptor is not part of XLogReader at all -- with such a change, it would make more sense to worry about the "close" callback not being "close" but something like "cleanup", as you suggest. But right now, and thinking from the point of view of going into postgres 13 beta shortly, it seems to me that XLogReader is just a very leaky abstraction since both itself and its users are aware of the fact that there is a file descriptor. Maybe with your rework for encryption you'll want to remove the FD from XLogReader at all, and move it elsewhere. Or maybe not. But it seems to me that my suggested approach is sensible, and better than the current situation. (Let's keep in mind that the primary concern here is that the callstack is way too complicated -- you ask XlogReader for data, it calls your Read callback, that one calls WALRead passing your openSegment callback and stuffs the FD in XLogReaderState ... a sieve it is, the way it leaks, not an abstraction.) > > I have to admit that until today I hadn't realized that that's what your > > patch series was doing. I'm not familiar with how you intend to > > implement WAL encryption on top of this, but on first blush I'm not > > liking this proposed design too much. > > Right. I might be too much in detail, but it simplifies the call > tree. Anyway that is another discussion, though:) Okay. We can discuss further changes later, of course. > It looks like as if the open/read/close-callbacks are generic and on > the same interface layer, but actually open-callback is dedicate to > WALRead and it is useless when the read-callback doesn't use > WALRead. What I was anxious about is that the open-callback is > uselessly exposing the secret of the read-callback. Well, I don't think we care about that. WALRead can be thought of as just a helper function that you may use to write your read callback. The comments I added explain this. > I meant concretely that we only have read- and cleanup- callbacks in > xlogreader state. The caller of XLogReaderAllocate specifies the > cleanup-callback that is to be used to clean up what the > reader-callback left behind, in the same manner with this patch does. > The only reason it is not named close-callback is that it is used only > when the xlogreader-state is destroyed. So I'm fine with > read/close-callbacks. We can revisit the current design in the future. For example for encryption we might decide to remove the current-open-segment FD from XLogReaderState and then things might be different. (I think the current design is based a lot on historical code, rather than being optimal.) Since your objection isn't strong, I propose to commit same patch as before, only rebased as conflicted with cd123234404e and this comment prologuing WALRead: /* * Helper function to ease writing of XLogRoutine->page_read callbacks. * If this function is used, caller must supply an open_segment callback in * 'state', as that is used here. [... rest is same as before ...] -- Álvaro Herrera https://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
>From fbe94ae201bc3963c71be65cb30f820da9591aeb Mon Sep 17 00:00:00 2001 From: Alvaro Herrera <alvhe...@alvh.no-ip.org> Date: Thu, 23 Apr 2020 18:02:20 -0400 Subject: [PATCH v2] Rework XLogReader callback system segment_open and segment_close are passed in at XLogReaderAllocate time, together with page_read; add comment to explain the role of WALRead. --- src/backend/access/transam/twophase.c | 5 +- src/backend/access/transam/xlog.c | 10 +- src/backend/access/transam/xlogreader.c | 51 ++++---- src/backend/access/transam/xlogutils.c | 24 ++-- src/backend/replication/logical/logical.c | 20 +-- .../replication/logical/logicalfuncs.c | 4 +- src/backend/replication/slotfuncs.c | 10 +- src/backend/replication/walsender.c | 37 ++++-- src/bin/pg_rewind/parsexlog.c | 9 +- src/bin/pg_waldump/pg_waldump.c | 30 +++-- src/include/access/xlogreader.h | 114 +++++++++++------- src/include/access/xlogutils.h | 5 + src/include/replication/logical.h | 4 +- 13 files changed, 210 insertions(+), 113 deletions(-) diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 2f7d4ed59a..e1904877fa 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -1331,7 +1331,10 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len) char *errormsg; xlogreader = XLogReaderAllocate(wal_segment_size, NULL, - &read_local_xlog_page, NULL); + XL_ROUTINE(.page_read = &read_local_xlog_page, + .segment_open = &wal_segment_open, + .segment_close = &wal_segment_close), + NULL); if (!xlogreader) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 0d3d670928..a53e6d9633 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -1223,7 +1223,7 @@ XLogInsertRecord(XLogRecData *rdata, if (!debug_reader) debug_reader = XLogReaderAllocate(wal_segment_size, NULL, - NULL, NULL); + XL_ROUTINE(), NULL); if (!debug_reader) { @@ -6478,8 +6478,12 @@ StartupXLOG(void) /* Set up XLOG reader facility */ MemSet(&private, 0, sizeof(XLogPageReadPrivate)); - xlogreader = XLogReaderAllocate(wal_segment_size, NULL, - &XLogPageRead, &private); + xlogreader = + XLogReaderAllocate(wal_segment_size, NULL, + XL_ROUTINE(.page_read = &XLogPageRead, + .segment_open = NULL, + .segment_close = wal_segment_close), + &private); if (!xlogreader) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 79ff976474..7cee8b92c9 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -71,7 +71,7 @@ report_invalid_record(XLogReaderState *state, const char *fmt,...) */ XLogReaderState * XLogReaderAllocate(int wal_segment_size, const char *waldir, - XLogPageReadCB pagereadfunc, void *private_data) + XLogReaderRoutine *routine, void *private_data) { XLogReaderState *state; @@ -81,6 +81,9 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir, if (!state) return NULL; + /* initialize caller-provided support functions */ + state->routine = *routine; + state->max_block_id = -1; /* @@ -102,7 +105,6 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir, WALOpenSegmentInit(&state->seg, &state->segcxt, wal_segment_size, waldir); - state->read_page = pagereadfunc; /* system_identifier initialized to zeroes above */ state->private_data = private_data; /* ReadRecPtr, EndRecPtr and readLen initialized to zeroes above */ @@ -137,7 +139,7 @@ XLogReaderFree(XLogReaderState *state) int block_id; if (state->seg.ws_file != -1) - close(state->seg.ws_file); + state->routine.segment_close(state); for (block_id = 0; block_id <= XLR_MAX_BLOCK_ID; block_id++) { @@ -250,7 +252,7 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr) * XLogBeginRead() or XLogFindNextRecord() must be called before the first call * to XLogReadRecord(). * - * If the read_page callback fails to read the requested data, NULL is + * If the page_read callback fails to read the requested data, NULL is * returned. The callback is expected to have reported the error; errormsg * is set to NULL. * @@ -559,10 +561,10 @@ err: /* * Read a single xlog page including at least [pageptr, reqLen] of valid data - * via the read_page() callback. + * via the page_read() callback. * * Returns -1 if the required page cannot be read for some reason; errormsg_buf - * is set in that case (unless the error occurs in the read_page callback). + * is set in that case (unless the error occurs in the page_read callback). * * We fetch the page from a reader-local cache if we know we have the required * data and if there hasn't been any error since caching the data. @@ -589,7 +591,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) * Data is not in our buffer. * * Every time we actually read the segment, even if we looked at parts of - * it before, we need to do verification as the read_page callback might + * it before, we need to do verification as the page_read callback might * now be rereading data from a different source. * * Whenever switching to a new WAL segment, we read the first page of the @@ -601,9 +603,9 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) { XLogRecPtr targetSegmentPtr = pageptr - targetPageOff; - readLen = state->read_page(state, targetSegmentPtr, XLOG_BLCKSZ, - state->currRecPtr, - state->readBuf); + readLen = state->routine.page_read(state, targetSegmentPtr, XLOG_BLCKSZ, + state->currRecPtr, + state->readBuf); if (readLen < 0) goto err; @@ -619,9 +621,9 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) * First, read the requested data length, but at least a short page header * so that we can validate it. */ - readLen = state->read_page(state, pageptr, Max(reqLen, SizeOfXLogShortPHD), - state->currRecPtr, - state->readBuf); + readLen = state->routine.page_read(state, pageptr, Max(reqLen, SizeOfXLogShortPHD), + state->currRecPtr, + state->readBuf); if (readLen < 0) goto err; @@ -638,9 +640,9 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) /* still not enough */ if (readLen < XLogPageHeaderSize(hdr)) { - readLen = state->read_page(state, pageptr, XLogPageHeaderSize(hdr), - state->currRecPtr, - state->readBuf); + readLen = state->routine.page_read(state, pageptr, XLogPageHeaderSize(hdr), + state->currRecPtr, + state->readBuf); if (readLen < 0) goto err; } @@ -1041,11 +1043,14 @@ err: #endif /* FRONTEND */ /* + * Helper function to ease writing of XLogRoutine->page_read callbacks. + * If this function is used, caller must supply an open_segment callback in + * 'state', as that is used here. + * * Read 'count' bytes into 'buf', starting at location 'startptr', from WAL * fetched from timeline 'tli'. * - * 'seg/segcxt' identify the last segment used. 'openSegment' is a callback - * to open the next segment, if necessary. + * 'seg/segcxt' identify the last segment used. * * Returns true if succeeded, false if an error occurs, in which case * 'errinfo' receives error details. @@ -1054,9 +1059,10 @@ err: * WAL buffers when possible. */ bool -WALRead(char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, +WALRead(XLogReaderState *state, + char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, WALOpenSegment *seg, WALSegmentContext *segcxt, - WALSegmentOpen openSegment, WALReadError *errinfo) + WALReadError *errinfo) { char *p; XLogRecPtr recptr; @@ -1086,10 +1092,11 @@ WALRead(char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, XLogSegNo nextSegNo; if (seg->ws_file >= 0) - close(seg->ws_file); + state->routine.segment_close(state); XLByteToSeg(recptr, nextSegNo, segcxt->ws_segsize); - seg->ws_file = openSegment(nextSegNo, segcxt, &tli); + seg->ws_file = state->routine.segment_open(state, nextSegNo, + segcxt, &tli); /* Update the current segment info. */ seg->ws_tli = tli; diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index 6cb143e161..bbd801513a 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -783,10 +783,10 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa } } -/* openSegment callback for WALRead */ -static int -wal_segment_open(XLogSegNo nextSegNo, WALSegmentContext * segcxt, - TimeLineID *tli_p) +/* XLogReaderRoutine->segment_open callback for local pg_wal files */ +int +wal_segment_open(XLogReaderState *state, XLogSegNo nextSegNo, + WALSegmentContext *segcxt, TimeLineID *tli_p) { TimeLineID tli = *tli_p; char path[MAXPGPATH]; @@ -811,8 +811,17 @@ wal_segment_open(XLogSegNo nextSegNo, WALSegmentContext * segcxt, return -1; /* keep compiler quiet */ } +/* stock XLogReaderRoutine->segment_close callback */ +void +wal_segment_close(XLogReaderState *state) +{ + close(state->seg.ws_file); + /* need to check errno? */ + state->seg.ws_file = -1; +} + /* - * read_page callback for reading local xlog files + * XLogReaderRoutine->page_read callback for reading local xlog files * * Public because it would likely be very helpful for someone writing another * output method outside walsender, e.g. in a bgworker. @@ -937,8 +946,9 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, * as 'count', read the whole page anyway. It's guaranteed to be * zero-padded up to the page boundary if it's incomplete. */ - if (!WALRead(cur_page, targetPagePtr, XLOG_BLCKSZ, tli, &state->seg, - &state->segcxt, wal_segment_open, &errinfo)) + if (!WALRead(state, cur_page, targetPagePtr, XLOG_BLCKSZ, tli, + &state->seg, &state->segcxt, + &errinfo)) WALReadRaiseError(&errinfo); /* number of valid bytes in the buffer */ diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 5adf253583..dc69e5ce5f 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -120,7 +120,7 @@ StartupDecodingContext(List *output_plugin_options, TransactionId xmin_horizon, bool need_full_snapshot, bool fast_forward, - XLogPageReadCB read_page, + XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress) @@ -169,7 +169,7 @@ StartupDecodingContext(List *output_plugin_options, ctx->slot = slot; - ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, read_page, ctx); + ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, xl_routine, ctx); if (!ctx->reader) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), @@ -215,7 +215,8 @@ StartupDecodingContext(List *output_plugin_options, * Otherwise, we set for decoding to start from the given LSN without * marking WAL reserved beforehand. In that scenario, it's up to the * caller to guarantee that WAL remains available. - * read_page, prepare_write, do_write, update_progress -- + * xl_routine -- XLogReaderRoutine for underlying XLogReader + * prepare_write, do_write, update_progress -- * callbacks that perform the use-case dependent, actual, work. * * Needs to be called while in a memory context that's at least as long lived @@ -230,7 +231,7 @@ CreateInitDecodingContext(char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, - XLogPageReadCB read_page, + XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress) @@ -327,7 +328,7 @@ CreateInitDecodingContext(char *plugin, ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon, need_full_snapshot, false, - read_page, prepare_write, do_write, + xl_routine, prepare_write, do_write, update_progress); /* call output plugin initialization callback */ @@ -357,7 +358,10 @@ CreateInitDecodingContext(char *plugin, * fast_forward * bypass the generation of logical changes. * - * read_page, prepare_write, do_write, update_progress + * xl_routine + * XLogReaderRoutine used by underlying xlogreader + * + * prepare_write, do_write, update_progress * callbacks that have to be filled to perform the use-case dependent, * actual work. * @@ -372,7 +376,7 @@ LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, - XLogPageReadCB read_page, + XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress) @@ -425,7 +429,7 @@ CreateDecodingContext(XLogRecPtr start_lsn, ctx = StartupDecodingContext(output_plugin_options, start_lsn, InvalidTransactionId, false, - fast_forward, read_page, prepare_write, + fast_forward, xl_routine, prepare_write, do_write, update_progress); /* call output plugin initialization callback */ diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index fded8e8290..b99c94e848 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -233,7 +233,9 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin ctx = CreateDecodingContext(InvalidXLogRecPtr, options, false, - read_local_xlog_page, + XL_ROUTINE(.page_read = read_local_xlog_page, + .segment_open = wal_segment_open, + .segment_close = wal_segment_close), LogicalOutputPrepareWrite, LogicalOutputWrite, NULL); diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index ae751e94e7..26890dffb4 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -152,8 +152,10 @@ create_logical_replication_slot(char *name, char *plugin, ctx = CreateInitDecodingContext(plugin, NIL, false, /* just catalogs is OK */ restart_lsn, - read_local_xlog_page, NULL, NULL, - NULL); + XL_ROUTINE(.page_read = read_local_xlog_page, + .segment_open = wal_segment_open, + .segment_close = wal_segment_close), + NULL, NULL, NULL); /* * If caller needs us to determine the decoding start point, do so now. @@ -464,7 +466,9 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto) ctx = CreateDecodingContext(InvalidXLogRecPtr, NIL, true, /* fast_forward */ - read_local_xlog_page, + XL_ROUTINE(.page_read = read_local_xlog_page, + .segment_open = wal_segment_open, + .segment_close = wal_segment_close), NULL, NULL, NULL); /* diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 8b55bbfcb2..e2204d80ee 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -54,8 +54,8 @@ #include "access/transam.h" #include "access/xact.h" #include "access/xlog_internal.h" +#include "access/xlogreader.h" #include "access/xlogutils.h" - #include "catalog/pg_authid.h" #include "catalog/pg_type.h" #include "commands/dbcommands.h" @@ -248,8 +248,8 @@ static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time); static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now); static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch); -static int WalSndSegmentOpen(XLogSegNo nextSegNo, WALSegmentContext *segcxt, - TimeLineID *tli_p); +static int WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, + WALSegmentContext *segcxt, TimeLineID *tli_p); static void UpdateSpillStats(LogicalDecodingContext *ctx); @@ -798,7 +798,8 @@ StartReplication(StartReplicationCmd *cmd) } /* - * read_page callback for logical decoding contexts, as a walsender process. + * XLogReaderRoutine->page_read callback for logical decoding contexts, as a + * walsender process. * * Inside the walsender we can do better than read_local_xlog_page, * which has to do a plain sleep/busy loop, because the walsender's latch gets @@ -832,7 +833,8 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req count = flushptr - targetPagePtr; /* part of the page available */ /* now actually read the data, we know it's there */ - if (!WALRead(cur_page, + if (!WALRead(state, + cur_page, targetPagePtr, XLOG_BLCKSZ, sendSeg->ws_tli, /* Pass the current TLI because only @@ -840,7 +842,6 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req * TLI is needed. */ sendSeg, sendCxt, - WalSndSegmentOpen, &errinfo)) WALReadRaiseError(&errinfo); @@ -1005,7 +1006,9 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot, InvalidXLogRecPtr, - logical_read_xlog_page, + XL_ROUTINE(.page_read = logical_read_xlog_page, + .segment_open = WalSndSegmentOpen, + .segment_close = wal_segment_close), WalSndPrepareWrite, WalSndWriteData, WalSndUpdateProgress); @@ -1168,7 +1171,9 @@ StartLogicalReplication(StartReplicationCmd *cmd) */ logical_decoding_ctx = CreateDecodingContext(cmd->startpoint, cmd->options, false, - logical_read_xlog_page, + XL_ROUTINE(.page_read = logical_read_xlog_page, + .segment_open = WalSndSegmentOpen, + .segment_close = wal_segment_close), WalSndPrepareWrite, WalSndWriteData, WalSndUpdateProgress); @@ -2441,9 +2446,10 @@ WalSndKill(int code, Datum arg) SpinLockRelease(&walsnd->mutex); } -/* walsender's openSegment callback for WALRead */ +/* XLogReaderRoutine->segment_open callback */ static int -WalSndSegmentOpen(XLogSegNo nextSegNo, WALSegmentContext *segcxt, +WalSndSegmentOpen(XLogReaderState *state, + XLogSegNo nextSegNo, WALSegmentContext *segcxt, TimeLineID *tli_p) { char path[MAXPGPATH]; @@ -2531,6 +2537,12 @@ XLogSendPhysical(void) Size nbytes; XLogSegNo segno; WALReadError errinfo; + static XLogReaderState fake_xlogreader = + { + /* XXX no page_read routine used by physical walsender */ + .routine.segment_open = WalSndSegmentOpen, + .routine.segment_close = wal_segment_close + }; /* If requested switch the WAL sender to the stopping state. */ if (got_STOPPING) @@ -2748,7 +2760,9 @@ XLogSendPhysical(void) enlargeStringInfo(&output_message, nbytes); retry: - if (!WALRead(&output_message.data[output_message.len], + /* XXX for xlogreader use, we'd call XLogBeginRead+XLogReadRecord here */ + if (!WALRead(&fake_xlogreader, + &output_message.data[output_message.len], startptr, nbytes, sendSeg->ws_tli, /* Pass the current TLI because only @@ -2756,7 +2770,6 @@ retry: * TLI is needed. */ sendSeg, sendCxt, - WalSndSegmentOpen, &errinfo)) WALReadRaiseError(&errinfo); diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c index c51b5db315..d637f5eb77 100644 --- a/src/bin/pg_rewind/parsexlog.c +++ b/src/bin/pg_rewind/parsexlog.c @@ -66,7 +66,8 @@ extractPageMap(const char *datadir, XLogRecPtr startpoint, int tliIndex, private.tliIndex = tliIndex; private.restoreCommand = restoreCommand; - xlogreader = XLogReaderAllocate(WalSegSz, datadir, &SimpleXLogPageRead, + xlogreader = XLogReaderAllocate(WalSegSz, datadir, + XL_ROUTINE(.page_read = &SimpleXLogPageRead), &private); if (xlogreader == NULL) pg_fatal("out of memory"); @@ -117,7 +118,8 @@ readOneRecord(const char *datadir, XLogRecPtr ptr, int tliIndex, private.tliIndex = tliIndex; private.restoreCommand = restoreCommand; - xlogreader = XLogReaderAllocate(WalSegSz, datadir, &SimpleXLogPageRead, + xlogreader = XLogReaderAllocate(WalSegSz, datadir, + XL_ROUTINE(.page_read = &SimpleXLogPageRead), &private); if (xlogreader == NULL) pg_fatal("out of memory"); @@ -176,7 +178,8 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex, private.tliIndex = tliIndex; private.restoreCommand = restoreCommand; - xlogreader = XLogReaderAllocate(WalSegSz, datadir, &SimpleXLogPageRead, + xlogreader = XLogReaderAllocate(WalSegSz, datadir, + XL_ROUTINE(.page_read = &SimpleXLogPageRead), &private); if (xlogreader == NULL) pg_fatal("out of memory"); diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c index d7bd9ccac2..e29f65500f 100644 --- a/src/bin/pg_waldump/pg_waldump.c +++ b/src/bin/pg_waldump/pg_waldump.c @@ -279,9 +279,10 @@ identify_target_directory(char *directory, char *fname) return NULL; /* not reached */ } -/* pg_waldump's openSegment callback for WALRead */ +/* pg_waldump's XLogReaderRoutine->segment_open callback */ static int -WALDumpOpenSegment(XLogSegNo nextSegNo, WALSegmentContext *segcxt, +WALDumpOpenSegment(XLogReaderState *state, + XLogSegNo nextSegNo, WALSegmentContext *segcxt, TimeLineID *tli_p) { TimeLineID tli = *tli_p; @@ -321,8 +322,18 @@ WALDumpOpenSegment(XLogSegNo nextSegNo, WALSegmentContext *segcxt, } /* - * XLogReader read_page callback + * pg_waldump's XLogReaderRoutine->segment_close callback. Same as + * wal_segment_close */ +static void +WALDumpCloseSegment(XLogReaderState *state) +{ + close(state->seg.ws_file); + /* need to check errno? */ + state->seg.ws_file = -1; +} + +/* pg_waldump's XLogReaderRoutine->page_read callback */ static int WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetPtr, char *readBuff) @@ -344,8 +355,9 @@ WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, } } - if (!WALRead(readBuff, targetPagePtr, count, private->timeline, - &state->seg, &state->segcxt, WALDumpOpenSegment, &errinfo)) + if (!WALRead(state, readBuff, targetPagePtr, count, private->timeline, + &state->seg, &state->segcxt, + &errinfo)) { WALOpenSegment *seg = &errinfo.wre_seg; char fname[MAXPGPATH]; @@ -1031,8 +1043,12 @@ main(int argc, char **argv) /* done with argument parsing, do the actual work */ /* we have everything we need, start reading */ - xlogreader_state = XLogReaderAllocate(WalSegSz, waldir, WALDumpReadPage, - &private); + xlogreader_state = + XLogReaderAllocate(WalSegSz, waldir, + XL_ROUTINE(.page_read = WALDumpReadPage, + .segment_open = WALDumpOpenSegment, + .segment_close = WALDumpCloseSegment), + &private); if (!xlogreader_state) fatal_error("out of memory"); diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index 4582196e18..6b9f7db646 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -17,6 +17,13 @@ * XLogBeginRead() or XLogFindNextRecord(), and call XLogReadRecord() * until it returns NULL. * + * Callers supply a page_read callback if they want to to call + * XLogReadRecord or XLogFindNextRecord; it can be passed in as NULL + * otherwise. The WALRead function can be used as a helper to write + * page_read callbacks, but it is not mandatory; callers that use it, + * must supply open_segment callbacks. The close_segment callback + * must always be supplied. + * * After reading a record with XLogReadRecord(), it's decomposed into * the per-block and main data parts, and the parts can be accessed * with the XLogRec* macros and functions. You can also decode a @@ -50,12 +57,64 @@ typedef struct WALSegmentContext typedef struct XLogReaderState XLogReaderState; -/* Function type definition for the read_page callback */ +/* Function type definitions for various xlogreader interactions */ typedef int (*XLogPageReadCB) (XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *readBuf); +typedef int (*WALSegmentOpenCB) (XLogReaderState *xlogreader, + XLogSegNo nextSegNo, + WALSegmentContext *segcxt, + TimeLineID *tli_p); +typedef void (*WALSegmentCloseCB) (XLogReaderState *xlogreader); + +typedef struct XLogReaderRoutine +{ + /* + * Data input callback + * + * This callback shall read at least reqLen valid bytes of the xlog page + * starting at targetPagePtr, and store them in readBuf. The callback + * shall return the number of bytes read (never more than XLOG_BLCKSZ), or + * -1 on failure. The callback shall sleep, if necessary, to wait for the + * requested bytes to become available. The callback will not be invoked + * again for the same page unless more than the returned number of bytes + * are needed. + * + * targetRecPtr is the position of the WAL record we're reading. Usually + * it is equal to targetPagePtr + reqLen, but sometimes xlogreader needs + * to read and verify the page or segment header, before it reads the + * actual WAL record it's interested in. In that case, targetRecPtr can + * be used to determine which timeline to read the page from. + * + * The callback shall set ->seg.ws_tli to the TLI of the file the page was + * read from. + */ + XLogPageReadCB page_read; + + /* + * Callback to open the specified WAL segment for reading. Returns a + * valid file descriptor when the file was opened successfully. + * + * "nextSegNo" is the number of the segment to be opened. + * + * "segcxt" is additional information about the segment. + * + * "tli_p" is an input/output argument. XLogRead() uses it to pass the + * timeline in which the new segment should be found, but the callback can + * use it to return the TLI that it actually opened. + * + * BasicOpenFile() is the preferred way to open the segment file in + * backend code, whereas open(2) should be used in frontend. + */ + WALSegmentOpenCB segment_open; + + /* WAL segment close callback */ + WALSegmentCloseCB segment_close; +} XLogReaderRoutine; + +#define XL_ROUTINE(...) &(XLogReaderRoutine){__VA_ARGS__} typedef struct { @@ -88,33 +147,16 @@ typedef struct struct XLogReaderState { + /* + * Operational callbacks + */ + XLogReaderRoutine routine; + /* ---------------------------------------- * Public parameters * ---------------------------------------- */ - /* - * Data input callback (mandatory). - * - * This callback shall read at least reqLen valid bytes of the xlog page - * starting at targetPagePtr, and store them in readBuf. The callback - * shall return the number of bytes read (never more than XLOG_BLCKSZ), or - * -1 on failure. The callback shall sleep, if necessary, to wait for the - * requested bytes to become available. The callback will not be invoked - * again for the same page unless more than the returned number of bytes - * are needed. - * - * targetRecPtr is the position of the WAL record we're reading. Usually - * it is equal to targetPagePtr + reqLen, but sometimes xlogreader needs - * to read and verify the page or segment header, before it reads the - * actual WAL record it's interested in. In that case, targetRecPtr can - * be used to determine which timeline to read the page from. - * - * The callback shall set ->seg.ws_tli to the TLI of the file the page was - * read from. - */ - XLogPageReadCB read_page; - /* * System identifier of the xlog files we're about to read. Set to zero * (the default value) if unknown or unimportant. @@ -214,30 +256,13 @@ struct XLogReaderState /* Get a new XLogReader */ extern XLogReaderState *XLogReaderAllocate(int wal_segment_size, const char *waldir, - XLogPageReadCB pagereadfunc, + XLogReaderRoutine *routine, void *private_data); +extern XLogReaderRoutine *LocalXLogReaderRoutine(void); /* Free an XLogReader */ extern void XLogReaderFree(XLogReaderState *state); -/* - * Callback to open the specified WAL segment for reading. Returns a valid - * file descriptor when the file was opened successfully. - * - * "nextSegNo" is the number of the segment to be opened. - * - * "segcxt" is additional information about the segment. - * - * "tli_p" is an input/output argument. XLogRead() uses it to pass the - * timeline in which the new segment should be found, but the callback can use - * it to return the TLI that it actually opened. - * - * BasicOpenFile() is the preferred way to open the segment file in backend - * code, whereas open(2) should be used in frontend. - */ -typedef int (*WALSegmentOpen) (XLogSegNo nextSegNo, WALSegmentContext *segcxt, - TimeLineID *tli_p); - /* Initialize supporting structures */ extern void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt, int segsize, const char *waldir); @@ -269,9 +294,10 @@ typedef struct WALReadError WALOpenSegment wre_seg; /* Segment we tried to read from. */ } WALReadError; -extern bool WALRead(char *buf, XLogRecPtr startptr, Size count, +extern bool WALRead(XLogReaderState *state, + char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, WALOpenSegment *seg, - WALSegmentContext *segcxt, WALSegmentOpen openSegment, + WALSegmentContext *segcxt, WALReadError *errinfo); /* Functions for decoding an XLogRecord */ diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h index 5181a077d9..68ce815476 100644 --- a/src/include/access/xlogutils.h +++ b/src/include/access/xlogutils.h @@ -50,6 +50,11 @@ extern void FreeFakeRelcacheEntry(Relation fakerel); extern int read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page); +extern int wal_segment_open(XLogReaderState *state, + XLogSegNo nextSegNo, + WALSegmentContext *segcxt, + TimeLineID *tli_p); +extern void wal_segment_close(XLogReaderState *state); extern void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength); diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 3b7ca7f1da..c2f2475e5d 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -95,14 +95,14 @@ extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, - XLogPageReadCB read_page, + XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress); extern LogicalDecodingContext *CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, - XLogPageReadCB read_page, + XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress); -- 2.20.1