On 22/08/2019 04:43, Kyotaro Horiguchi wrote:
At Mon, 29 Jul 2019 22:39:57 +0300, Heikki Linnakangas <hlinn...@iki.fi> wrote in
<e1ecb53b-663d-98ed-2249-dfa30a74f...@iki.fi>
On 12/07/2019 10:10, Kyotaro Horiguchi wrote:
* XLogReaderState->readBuf is now allocated and controlled by the
* caller, not by xlogreader.c itself. When XLogReadRecord() needs data,
* the caller makes the data available in readBuf, which can point to the
* same buffer in all calls, or the caller may allocate a new buffer, or
* it may point to a part of a larger buffer, whatever is convenient for
* the caller. (Currently, all callers just allocate a BLCKSZ'd buffer,
* though). The caller also sets readPagPtr, readLen and readPageTLI to
* tell XLogReadRecord() what's in the buffer. So all these read* fields
* are now set by the caller, XLogReadRecord() only reads them.
The caller knows how many byes to be read. So the caller provides
the required buffer seems reasonable.
I also had in mind that the caller could provide a larger buffer,
spanning multiple pages, in one XLogReadRecord() call. It might be
convenient to load a whole WAL file in memory and pass it to
XLogReadRecord() in one call, for example. I think the interface would
now allow that, but the code won't actually take advantage of that.
XLogReadRecord() will always ask for one page at a time, and I think it
will ask the caller for more data between each page, even if the caller
supplies more than one page in one call.
I'm not sure how intelligible this patch is in its current state. But
I think the general idea is good. I plan to clean it up further next
week, but feel free to work on it before that, either based on this
patch or by starting afresh from your patch set.
I think you diff is intelligible enough for me. I'll take this if
you haven't done. Anyway I'm staring on this.
Thanks! I did actually spend some time on this last week, but got
distracted by something else before finishing it up and posting a patch.
Here's a snapshot of what I have in my local branch. It seems to pass
"make check-world", but probably needs some more cleanup.
Main changes since last version:
* I changed the interface so that there is a new function to set the
starting position, XLogBeginRead(), and XLogReadRecord() always
continues from where it left off. I think that's more clear than having
a starting point argument in XLogReadRecord(), which was only set on the
first call. It makes the calling code more clear, too, IMO.
* Refactored the implementation of XLogFindNextRecord().
XLogFindNextRecord() is now a sibling function of XLogBeginRead(). It
sets the starting point like XLogBeginRead(). The difference is that
with XLogFindNextRecord(), the starting point doesn't need to point to a
valid record, it will "fast forward" to the next valid record after the
point. The "fast forwarding" is done in an extra state in the state
machine in XLogReadRecord().
* I refactored XLogReadRecord() and the internal XLogNeedData()
function. XLogNeedData() used to contain logic for verifying segment and
page headers. That works quite differently now. Checking the segment
header is now a new state in the state machine, and the page header is
verified at the top of XLogReadRecord(), whenever the caller provides
new data. I think that makes the code in XLogReadRecord() more clear.
- Heikki
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 477709bbc23..8ecb5ea55c5 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -1386,15 +1386,21 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
XLogReaderState *xlogreader;
char *errormsg;
- xlogreader = XLogReaderAllocate(wal_segment_size, &read_local_xlog_page,
- NULL);
+ xlogreader = XLogReaderAllocate(wal_segment_size);
if (!xlogreader)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of memory"),
errdetail("Failed while allocating a WAL reading processor.")));
+ xlogreader->readBuf = palloc(XLOG_BLCKSZ);
+
+ XLogBeginRead(xlogreader, lsn);
+ while (XLogReadRecord(xlogreader, &record, &errormsg) ==
+ XLREAD_NEED_DATA)
+ {
+ read_local_xlog_page(xlogreader);
+ }
- record = XLogReadRecord(xlogreader, lsn, &errormsg);
if (record == NULL)
ereport(ERROR,
(errcode_for_file_access(),
@@ -1416,6 +1422,7 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
*buf = palloc(sizeof(char) * XLogRecGetDataLen(xlogreader));
memcpy(*buf, XLogRecGetData(xlogreader), sizeof(char) * XLogRecGetDataLen(xlogreader));
+ pfree(xlogreader->readBuf);
XLogReaderFree(xlogreader);
}
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index e651a841bbe..1bb303a90dc 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -803,13 +803,6 @@ static XLogSource readSource = 0; /* XLOG_FROM_* code */
static XLogSource currentSource = 0; /* XLOG_FROM_* code */
static bool lastSourceFailed = false;
-typedef struct XLogPageReadPrivate
-{
- int emode;
- bool fetching_ckpt; /* are we fetching a checkpoint record? */
- bool randAccess;
-} XLogPageReadPrivate;
-
/*
* These variables track when we last obtained some WAL data to process,
* and where we got it from. (XLogReceiptSource is initially the same as
@@ -884,9 +877,8 @@ static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath,
static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
int source, bool notfoundOk);
static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, int source);
-static int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
- int reqLen, XLogRecPtr targetRecPtr, char *readBuf,
- TimeLineID *readTLI);
+static void XLogPageRead(XLogReaderState *xlogreader,
+ bool fetching_ckpt, int emode, bool randAccess);
static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
bool fetching_ckpt, XLogRecPtr tliRecPtr);
static int emode_for_corrupt_record(int emode, XLogRecPtr RecPtr);
@@ -1195,7 +1187,7 @@ XLogInsertRecord(XLogRecData *rdata,
appendBinaryStringInfo(&recordBuf, rdata->data, rdata->len);
if (!debug_reader)
- debug_reader = XLogReaderAllocate(wal_segment_size, NULL, NULL);
+ debug_reader = XLogReaderAllocate(wal_segment_size);
if (!debug_reader)
{
@@ -4247,21 +4239,29 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode,
bool fetching_ckpt)
{
XLogRecord *record;
- XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data;
-
- /* Pass through parameters to XLogPageRead */
- private->fetching_ckpt = fetching_ckpt;
- private->emode = emode;
- private->randAccess = (RecPtr != InvalidXLogRecPtr);
+ bool randAccess = (RecPtr != InvalidXLogRecPtr);
/* This is the first attempt to read this page. */
lastSourceFailed = false;
+ if (RecPtr != InvalidXLogRecPtr)
+ XLogBeginRead(xlogreader, RecPtr);
+
for (;;)
{
char *errormsg;
- record = XLogReadRecord(xlogreader, RecPtr, &errormsg);
+ while (XLogReadRecord(xlogreader, &record, &errormsg)
+ == XLREAD_NEED_DATA)
+ {
+ XLogPageRead(xlogreader, fetching_ckpt, emode, randAccess);
+ if (xlogreader->readLen == -1)
+ {
+ record = NULL;
+ break;
+ }
+ }
+
ReadRecPtr = xlogreader->ReadRecPtr;
EndRecPtr = xlogreader->EndRecPtr;
if (record == NULL)
@@ -4271,6 +4271,8 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode,
close(readFile);
readFile = -1;
}
+ /* Force the WAL page to be reloaded. */
+ xlogreader->readLen = -1;
/*
* We only end up here without a message when XLogPageRead()
@@ -6211,7 +6213,6 @@ StartupXLOG(void)
bool backupFromStandby = false;
DBState dbstate_at_startup;
XLogReaderState *xlogreader;
- XLogPageReadPrivate private;
bool fast_promoted = false;
struct stat st;
@@ -6352,13 +6353,13 @@ StartupXLOG(void)
OwnLatch(&XLogCtl->recoveryWakeupLatch);
/* Set up XLOG reader facility */
- MemSet(&private, 0, sizeof(XLogPageReadPrivate));
- xlogreader = XLogReaderAllocate(wal_segment_size, &XLogPageRead, &private);
+ xlogreader = XLogReaderAllocate(wal_segment_size);
if (!xlogreader)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of memory"),
errdetail("Failed while allocating a WAL reading processor.")));
+ xlogreader->readBuf = palloc(XLOG_BLCKSZ);
xlogreader->system_identifier = ControlFile->system_identifier;
/*
@@ -7730,6 +7731,7 @@ StartupXLOG(void)
close(readFile);
readFile = -1;
}
+ pfree(xlogreader->readBuf);
XLogReaderFree(xlogreader);
/*
@@ -11521,13 +11523,15 @@ CancelBackup(void)
* XLogPageRead() to try fetching the record from another source, or to
* sleep and retry.
*/
-static int
-XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
- XLogRecPtr targetRecPtr, char *readBuf, TimeLineID *readTLI)
-{
- XLogPageReadPrivate *private =
- (XLogPageReadPrivate *) xlogreader->private_data;
- int emode = private->emode;
+static void
+XLogPageRead(XLogReaderState *xlogreader,
+ bool fetching_ckpt, int emode, bool randAccess)
+{
+ XLogRecPtr targetPagePtr = xlogreader->loadPagePtr;
+ int reqLen = xlogreader->loadLen;
+ XLogRecPtr targetRecPtr = xlogreader->ReadRecPtr;
+ char *readBuf = xlogreader->readBuf;
+ TimeLineID *readTLI = &xlogreader->readPageTLI;
uint32 targetPageOff;
XLogSegNo targetSegNo PG_USED_FOR_ASSERTS_ONLY;
int r;
@@ -11570,8 +11574,8 @@ retry:
receivedUpto < targetPagePtr + reqLen))
{
if (!WaitForWALToBecomeAvailable(targetPagePtr + reqLen,
- private->randAccess,
- private->fetching_ckpt,
+ randAccess,
+ fetching_ckpt,
targetRecPtr))
{
if (readFile >= 0)
@@ -11580,7 +11584,9 @@ retry:
readLen = 0;
readSource = 0;
- return -1;
+ xlogreader->readPagePtr = InvalidXLogRecPtr;
+ xlogreader->readLen = -1;
+ return;
}
}
@@ -11675,7 +11681,9 @@ retry:
goto next_record_is_invalid;
}
- return readLen;
+ xlogreader->readPagePtr = targetPagePtr;
+ xlogreader->readLen = readLen;
+ return;
next_record_is_invalid:
lastSourceFailed = true;
@@ -11689,8 +11697,10 @@ next_record_is_invalid:
/* In standby-mode, keep trying */
if (StandbyMode)
goto retry;
- else
- return -1;
+
+ xlogreader->readPagePtr = InvalidXLogRecPtr;
+ xlogreader->readLen = -1;
+ return;
}
/*
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index c6faf48d24f..a7a06c0c6aa 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -30,14 +30,46 @@
#include "utils/memutils.h"
#endif
+/*
+ * XLogBeginRead() puts the reader to XLREAD_NEED_SEGMENT_HEADER, so that
+ * the first call to XLogReadRecord() will read and validate the segment
+ * header. After that, we loop through NEED_TOT_LEN -> NEED_TOT_LEN ->
+ * NEED_CONTINUATION -> NEXT_RECORD.
+ */
+enum XLogReadRecordState
+{
+ XLREAD_NEED_SEGMENT_HEADER,
+ XLREAD_SKIP_CONTRECORDS,
+
+ /* between records */
+ XLREAD_NEXT_RECORD,
+
+ /*
+ * ReadRecPtr has been set to point to beginning of record, nothing has
+ * been read yet.
+ */
+ XLREAD_NEED_TOT_LEN,
+
+ /*
+ * We have read a record partially, but need to read one or more continuation
+ * records to complete it. recordGotLen indicates how much we have read
+ * already, and recordRemainLen indicates how much is left. readRecordBuf holds
+ * the partially assembled record. recordContRecPtr points to the beginning
+ * of the next page where to continue.
+ */
+ XLREAD_NEED_CONTINUATION
+};
+
static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength);
static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
- XLogRecPtr PrevRecPtr, XLogRecord *record, bool randAccess);
+ XLogRecPtr PrevRecPtr, XLogRecord *record);
static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record,
XLogRecPtr recptr);
-static int ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr,
- int reqLen);
+static bool XLogNeedPage(XLogReaderState *state, XLogRecPtr pageptr);
+static bool XLogNeedData(XLogReaderState *state, XLogRecPtr pageptr,
+ int reqLen);
+static void XLogReaderInvalReadState(XLogReaderState *state);
static void report_invalid_record(XLogReaderState *state, const char *fmt,...) pg_attribute_printf(2, 3);
static void ResetDecoder(XLogReaderState *state);
@@ -67,8 +99,7 @@ report_invalid_record(XLogReaderState *state, const char *fmt,...)
* Returns NULL if the xlogreader couldn't be allocated.
*/
XLogReaderState *
-XLogReaderAllocate(int wal_segment_size, XLogPageReadCB pagereadfunc,
- void *private_data)
+XLogReaderAllocate(int wal_segment_size)
{
XLogReaderState *state;
@@ -80,32 +111,12 @@ XLogReaderAllocate(int wal_segment_size, XLogPageReadCB pagereadfunc,
state->max_block_id = -1;
- /*
- * Permanently allocate readBuf. We do it this way, rather than just
- * making a static array, for two reasons: (1) no need to waste the
- * storage in most instantiations of the backend; (2) a static char array
- * isn't guaranteed to have any particular alignment, whereas
- * palloc_extended() will provide MAXALIGN'd storage.
- */
- state->readBuf = (char *) palloc_extended(XLOG_BLCKSZ,
- MCXT_ALLOC_NO_OOM);
- if (!state->readBuf)
- {
- pfree(state);
- return NULL;
- }
-
state->wal_segment_size = wal_segment_size;
- state->read_page = pagereadfunc;
- /* system_identifier initialized to zeroes above */
- state->private_data = private_data;
- /* ReadRecPtr and EndRecPtr initialized to zeroes above */
- /* readSegNo, readOff, readLen, readPageTLI initialized to zeroes above */
+ /* All members are initialized to zeroes above */
state->errormsg_buf = palloc_extended(MAX_ERRORMSG_LEN + 1,
MCXT_ALLOC_NO_OOM);
if (!state->errormsg_buf)
{
- pfree(state->readBuf);
pfree(state);
return NULL;
}
@@ -118,11 +129,12 @@ XLogReaderAllocate(int wal_segment_size, XLogPageReadCB pagereadfunc,
if (!allocate_recordbuf(state, 0))
{
pfree(state->errormsg_buf);
- pfree(state->readBuf);
pfree(state);
return NULL;
}
+ XLogReaderInvalReadState(state);
+
return state;
}
@@ -142,7 +154,6 @@ XLogReaderFree(XLogReaderState *state)
pfree(state->errormsg_buf);
if (state->readRecordBuf)
pfree(state->readRecordBuf);
- pfree(state->readBuf);
pfree(state);
}
@@ -197,306 +208,590 @@ allocate_recordbuf(XLogReaderState *state, uint32 reclength)
return true;
}
+
/*
- * Attempt to read an XLOG record.
- *
- * If RecPtr is valid, try to read a record at that position. Otherwise
- * try to read a record just after the last one previously read.
- *
- * If the read_page callback fails to read the requested data, NULL is
- * returned. The callback is expected to have reported the error; errormsg
- * is set to NULL.
- *
- * If the reading fails for some other reason, NULL is also returned, and
- * *errormsg is set to a string with details of the failure.
- *
- * The returned pointer (or *errormsg) points to an internal buffer that's
- * valid until the next call to XLogReadRecord.
+ * Begin reading WAL at 'recptr'. 'recptr' should point to the beginnning of
+ * a valid WAL record. (Pointing at the beginning of a page is also OK, if there is
+ * a new record right after the page header, i.e. not a continuation)
*/
-XLogRecord *
-XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
+void
+XLogBeginRead(XLogReaderState *state, XLogRecPtr recptr)
{
- XLogRecord *record;
- XLogRecPtr targetPagePtr;
- bool randAccess;
- uint32 len,
- total_len;
- uint32 targetRecOff;
- uint32 pageHeaderSize;
- bool gotheader;
- int readOff;
+ Assert(!XLogRecPtrIsInvalid(recptr));
+
+ ResetDecoder(state);
+
+ /* Begin at the passed-in record pointer. */
+ state->ReadRecPtr = recptr;
+ state->SkipRecPtr = InvalidXLogRecPtr;
/*
- * randAccess indicates whether to verify the previous-record pointer of
- * the record we're reading. We only do this if we're reading
- * sequentially, which is what we initially assume.
+ * We cannot verify the previous-record pointer when we're seeking to a
+ * particular record. Reset PrevRecPtr so that we won't try doing that.
*/
- randAccess = false;
+ state->PrevRecPtr = InvalidXLogRecPtr;
- /* reset error state */
- *errormsg = NULL;
- state->errormsg_buf[0] = '\0';
+ /* also reset EndRecPtr to be tidy */
+ state->EndRecPtr = InvalidXLogRecPtr;
- ResetDecoder(state);
+ /*
+ * Reset verifiedPagePtr to force reading the page again. (Not sure if this
+ * is needed, but seems better to play it safe.)
+ */
+ XLogReaderInvalReadState(state);
- if (RecPtr == InvalidXLogRecPtr)
- {
- /* No explicit start point; read the record after the one we just read */
- RecPtr = state->EndRecPtr;
+ state->readRecordState = XLREAD_NEED_SEGMENT_HEADER;
+}
- if (state->ReadRecPtr == InvalidXLogRecPtr)
- randAccess = true;
+#ifdef FRONTEND
+/*
+ * Functions that are currently not needed in the backend, but are better
+ * implemented inside xlogreader.c because of the internal facilities available
+ * here.
+ */
- /*
- * RecPtr is pointing to end+1 of the previous WAL record. If we're
- * at a page boundary, no more records can fit on the current page. We
- * must skip over the page header, but we can't do that until we've
- * read in the page, since the header size is variable.
- */
- }
- else
- {
- /*
- * Caller supplied a position to start at.
- *
- * In this case, the passed-in record pointer should already be
- * pointing to a valid record starting position.
- */
- Assert(XRecOffIsValid(RecPtr));
- randAccess = true;
- }
+/*
+ * Begin reading at the first record with an lsn >= recptr.
+ *
+ * Useful for checking whether recptr is a valid xlog address for reading, and
+ * to find the first valid address after some address when dumping records for
+ * debugging purposes.
+ */
+void
+XLogFindNextRecord(XLogReaderState *state, XLogRecPtr recptr)
+{
+ Assert(!XLogRecPtrIsInvalid(recptr));
- state->currRecPtr = RecPtr;
+ ResetDecoder(state);
- targetPagePtr = RecPtr - (RecPtr % XLOG_BLCKSZ);
- targetRecOff = RecPtr % XLOG_BLCKSZ;
+ /* Begin at the passed-in record pointer. */
+ state->ReadRecPtr = recptr;
+ state->SkipRecPtr = recptr;
/*
- * Read the page containing the record into state->readBuf. Request enough
- * byte to cover the whole record header, or at least the part of it that
- * fits on the same page.
+ * We cannot verify the previous-record pointer when we're seeking to a
+ * particular record. Reset PrevRecPtr so that we won't try doing that.
*/
- readOff = ReadPageInternal(state,
- targetPagePtr,
- Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ));
- if (readOff < 0)
- goto err;
+ state->PrevRecPtr = InvalidXLogRecPtr;
+
+ /* also reset EndRecPtr to be tidy */
+ state->EndRecPtr = InvalidXLogRecPtr;
/*
- * ReadPageInternal always returns at least the page header, so we can
- * examine it now.
+ * Reset verifiedPagePtr to force reading the page again. (Not sure if this
+ * is needed, but seems better to play it safe.)
*/
- pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
- if (targetRecOff == 0)
- {
- /*
- * At page start, so skip over page header.
- */
- RecPtr += pageHeaderSize;
- targetRecOff = pageHeaderSize;
- }
- else if (targetRecOff < pageHeaderSize)
- {
- report_invalid_record(state, "invalid record offset at %X/%X",
- (uint32) (RecPtr >> 32), (uint32) RecPtr);
- goto err;
- }
+ XLogReaderInvalReadState(state);
- if ((((XLogPageHeader) state->readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
- targetRecOff == pageHeaderSize)
- {
- report_invalid_record(state, "contrecord is requested by %X/%X",
- (uint32) (RecPtr >> 32), (uint32) RecPtr);
- goto err;
- }
+ state->readRecordState = XLREAD_NEED_SEGMENT_HEADER;
+}
- /* ReadPageInternal has verified the page header */
- Assert(pageHeaderSize <= readOff);
+#endif /* FRONTEND */
- /*
- * Read the record length.
- *
- * NB: Even though we use an XLogRecord pointer here, the whole record
- * header might not fit on this page. xl_tot_len is the first field of the
- * struct, so it must be on this page (the records are MAXALIGNed), but we
- * cannot access any other fields until we've verified that we got the
- * whole header.
- */
- record = (XLogRecord *) (state->readBuf + RecPtr % XLOG_BLCKSZ);
- total_len = record->xl_tot_len;
+/*
+ * Attempt to read an XLOG record.
+ *
+ * This function runs a state machine and may need to call several times until
+ * a record is read.
+ *
+ * XLogBeginRead() or XLogFindNextRecord() must be called before the first call
+ * to XLogReadRecord().
+ *
+ * When a record is successfully read, returns XLREAD_SUCCESS with result
+ * record being stored in *record.
+ *
+ * Returns XLREAD_NEED_DATA if more data is needed to finish reading the current
+ * record. In that case, state->loadPagePtr and state->loadLen are set to inform
+ * the caller the WAL position and minimum length of data needed. The caller
+ * shall read in the requested data and set state->readBuf to point to a buffer
+ * containing it. The caller must also set state->readPagePtr, state->readPageTLI,
+ * and state->readLen to indicate the starting position of the read data (which
+ * must equal loadPagePtr), the timeline that it was read from, and the length
+ * of data that is now available (which must be >= loadLen), respectively.
+ *
+ * If invalid data is encountered, XLogReadRecord returns XLREAD_FAIL with *record
+ * being set to NULL. *errormsg is set to a string with details of the failure.
+ *
+ * The returned pointer (or *errormsg) points to an internal buffer that's
+ * valid until the next call to XLogReadRecord.
+ *
+ * Note: DO NOT non-local exit (ereport) from inside of this function.
+ */
+
+XLogReadRecordResult
+XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg)
+{
+ /* reset error state */
+ *errormsg = NULL;
+ state->errormsg_buf[0] = '\0';
/*
- * If the whole record header is on this page, validate it immediately.
- * Otherwise do just a basic sanity check on xl_tot_len, and validate the
- * rest of the header after reading it from the next page. The xl_tot_len
- * check is necessary here to ensure that we enter the "Need to reassemble
- * record" code path below; otherwise we might fail to apply
- * ValidXLogRecordHeader at all.
+ * If the caller provided us with new data, after a XLREAD_NEED_DATA result
+ * from previous call, verify the page header on the data that was provided.
*/
- if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord)
+ if (state->loadPagePtr != InvalidXLogRecPtr)
{
- if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr, record,
- randAccess))
- goto err;
- gotheader = true;
- }
- else
- {
- /* XXX: more validation should be done here */
- if (total_len < SizeOfXLogRecord)
+ XLogPageHeader hdr;
+
+ if (state->readPagePtr != state->loadPagePtr || state->readLen < state->loadLen)
{
- report_invalid_record(state,
- "invalid record length at %X/%X: wanted %u, got %u",
- (uint32) (RecPtr >> 32), (uint32) RecPtr,
- (uint32) SizeOfXLogRecord, total_len);
+ /*
+ * We asked for data, but the caller didn't provide it.
+ *
+ * XXX: Would it be better to return XLREAD_NEED_DATA, and let the
+ * caller retry? There's a good chance that it would lead to an infinite
+ * loop, though.
+ */
+ report_invalid_record(state, "invalid XLogReader state");
goto err;
}
- gotheader = false;
- }
- len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ;
- if (total_len > len)
- {
- /* Need to reassemble record */
- char *contdata;
- XLogPageHeader pageHeader;
- char *buffer;
- uint32 gotlen;
+ /* We have enough data to check the header length. */
+ hdr = (XLogPageHeader) state->readBuf;
+
+ /* still not enough */
+ if (state->readLen < XLogPageHeaderSize(hdr))
+ {
+ state->verifiedPagePtr = InvalidXLogRecPtr;
+ state->verifiedPageLen = 0;
+ state->loadLen = XLogPageHeaderSize(hdr);
+ return XLREAD_NEED_DATA;
+ }
/*
- * Enlarge readRecordBuf as needed.
+ * Now that we know we have the full header, validate it.
*/
- if (total_len > state->readRecordBufSize &&
- !allocate_recordbuf(state, total_len))
+ if (!XLogReaderValidatePageHeader(state, state->readPagePtr, (char *) state->readBuf))
{
- /* We treat this as a "bogus data" condition */
- report_invalid_record(state, "record length %u at %X/%X too long",
- total_len,
- (uint32) (RecPtr >> 32), (uint32) RecPtr);
+ XLogReaderInvalReadState(state);
goto err;
}
- /* Copy the first fragment of the record from the first page. */
- memcpy(state->readRecordBuf,
- state->readBuf + RecPtr % XLOG_BLCKSZ, len);
- buffer = state->readRecordBuf + len;
- gotlen = len;
+ state->verifiedPagePtr = state->readPagePtr;
+ state->verifiedPageLen = state->readLen;
+ state->loadPagePtr = InvalidXLogRecPtr;
+ state->loadLen = 0;
- do
- {
- /* Calculate pointer to beginning of next page */
- targetPagePtr += XLOG_BLCKSZ;
+ /* we have the requested page (or part of it) in the buffer now. */
+ }
- /* Wait for the next page to become available */
- readOff = ReadPageInternal(state, targetPagePtr,
- Min(total_len - gotlen + SizeOfXLogShortPHD,
- XLOG_BLCKSZ));
+again:
+ switch (state->readRecordState)
+ {
+ /*
+ * NEED_SEGMENT_HEADER is the first state after a call to XLogBeginRead().
+ * In this state, we need to read the first page on the WAL segment containing
+ * the first record to read. This is not needed when we continue to read
+ * after a previous record, because we read the WAL sequentially and will
+ * encounter the first page of each WAL segment in due course, anyway.
+ */
+ case XLREAD_NEED_SEGMENT_HEADER:
+ {
+ XLogRecPtr seg_begin_ptr;
- if (readOff < 0)
- goto err;
+ /* Request the first page in the segment. */
+ seg_begin_ptr = state->ReadRecPtr - state->ReadRecPtr % state->wal_segment_size;
+ if (XLogNeedPage(state, seg_begin_ptr))
+ return XLREAD_NEED_DATA;
- Assert(SizeOfXLogShortPHD <= readOff);
+ if (state->SkipRecPtr != InvalidXLogRecPtr)
+ state->readRecordState = XLREAD_SKIP_CONTRECORDS;
+ else
+ state->readRecordState = XLREAD_NEED_TOT_LEN;
+ goto again;
+ }
- /* Check that the continuation on next page looks valid */
- pageHeader = (XLogPageHeader) state->readBuf;
- if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD))
+ /*
+ * SKIP_CONTRECORDS state is only entered if the reader was initialized with
+ * XLogFindNextRecord(). In this state, we read the page containing ReadRecPtr,
+ * and skip over any continuation data at the beginning of the page, until
+ * we find the first valid record.
+ */
+ case XLREAD_SKIP_CONTRECORDS:
{
- report_invalid_record(state,
- "there is no contrecord flag at %X/%X",
- (uint32) (RecPtr >> 32), (uint32) RecPtr);
- goto err;
+ XLogRecPtr targetPagePtr;
+ uint32 pageHeaderSize;
+ XLogPageHeader hdr;
+
+ /*
+ * Read the page containing the record into state->readBuf.
+ */
+ targetPagePtr = state->ReadRecPtr - (state->ReadRecPtr % XLOG_BLCKSZ);
+
+ if (XLogNeedPage(state, targetPagePtr))
+ return XLREAD_NEED_DATA;
+
+ /*
+ * We have loaded at least the page header, so we can examine it now.
+ * (XLogNeedPage() ensures that)
+ */
+ hdr = (XLogPageHeader) state->readBuf;
+ pageHeaderSize = XLogPageHeaderSize(hdr);
+ /* XLogNeedPage has verified the page header */
+ Assert(pageHeaderSize <= state->readLen);
+
+ /*
+ * skip over potential continuation data.
+ */
+ if (((XLogPageHeader) state->readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD)
+ {
+ /*
+ * If the length of the remaining continuation data is more than
+ * what can fit in this page, the continuation record crosses over
+ * this page. Read the next page and try again. xlp_rem_len in the
+ * next page header will contain the remaining length of the
+ * continuation data
+ *
+ * Note that record headers are MAXALIGN'ed
+ */
+ if (MAXALIGN(hdr->xlp_rem_len) > (XLOG_BLCKSZ - pageHeaderSize))
+ {
+ state->ReadRecPtr = targetPagePtr + XLOG_BLCKSZ;
+ /* stay in XLREAD_SKIP_CONTRECORDS state */
+ }
+ else
+ {
+ /*
+ * The previous continuation record ends in this page. Set
+ * ReadRecPtr to point to the first valid record
+ */
+ state->ReadRecPtr = targetPagePtr + pageHeaderSize
+ + MAXALIGN(hdr->xlp_rem_len);
+ state->readRecordState = XLREAD_NEED_TOT_LEN;
+ }
+ }
+ else
+ state->readRecordState = XLREAD_NEED_TOT_LEN;
+ goto again;
}
/*
- * Cross-check that xlp_rem_len agrees with how much of the record
- * we expect there to be left.
+ * Read next record, after the previous one.
+ *
+ * This state is entered when XLogReadReord() has returned a valid record.
+ * Initialize the state for reading the next record.
+ */
+ case XLREAD_NEXT_RECORD:
+ ResetDecoder(state);
+
+ /*
+ * EndRecPtr is pointing to end+1 of the previous WAL record. If we're
+ * at a page boundary, no more records can fit on the current page. We
+ * must skip over the page header on the next page, but we can't do that until we've
+ * read in the page, since the header size is variable.
+ */
+ state->PrevRecPtr = state->ReadRecPtr;
+ state->ReadRecPtr = state->EndRecPtr;
+
+ state->readRecordState = XLREAD_NEED_TOT_LEN;
+ /* fall through */
+
+ /*
+ * Start reading the record at ReadRecPtr.
+ *
+ * When entering this state, state->ReadRecPtr points to the record
+ * we want to read, and PrevRecPtr to the previous record, if
+ * known.
+ *
+ * We will read as much as we can from the first page containing
+ * ReadRecPtr, and continue in XLREAD_NEED_CONTINUATION state if
+ * needed.
*/
- if (pageHeader->xlp_rem_len == 0 ||
- total_len != (pageHeader->xlp_rem_len + gotlen))
+ case XLREAD_NEED_TOT_LEN:
{
- report_invalid_record(state,
- "invalid contrecord length %u at %X/%X",
- pageHeader->xlp_rem_len,
- (uint32) (RecPtr >> 32), (uint32) RecPtr);
- goto err;
- }
+ uint32 total_len;
+ uint32 pageHeaderSize;
+ XLogRecPtr targetPagePtr;
+ uint32 targetRecOff;
+ uint32 pageRemain;
+ uint32 len;
+ XLogRecord *rechdr;
+
+ /*
+ * Read the page containing the record into state->readBuf.
+ */
+ targetPagePtr = state->ReadRecPtr - (state->ReadRecPtr % XLOG_BLCKSZ);
+ targetRecOff = state->ReadRecPtr % XLOG_BLCKSZ;
+
+ if (XLogNeedPage(state, targetPagePtr))
+ return XLREAD_NEED_DATA;
+
+ /*
+ * We have loaded at least the page header, so we can examine it now.
+ * (XLogNeedPage() ensures that)
+ */
+ pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
+ /* XLogNeedPage has verified the page header */
+ Assert(pageHeaderSize <= state->readLen);
+
+ /*
+ * If the previous record ended at page boundary, then we must skip over the
+ * page header to get to the beginning of the next record.
+ */
+ if (targetRecOff == 0)
+ {
+ state->ReadRecPtr += pageHeaderSize;
+ targetRecOff = state->ReadRecPtr % XLOG_BLCKSZ;
+ }
+
+ /*
+ * sanity check that the starting position doesn't point to
+ * the middle of the page header.
+ */
+ if (targetRecOff < pageHeaderSize)
+ {
+ report_invalid_record(state, "invalid record offset at %X/%X",
+ (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
+ goto err;
+ }
- /* Append the continuation from this page to the buffer */
- pageHeaderSize = XLogPageHeaderSize(pageHeader);
+ /*
+ * If we're reading the first record on the page, then the CONTRECORD flag should not
+ * be set.
+ */
+ if ((((XLogPageHeader) state->readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
+ targetRecOff == pageHeaderSize)
+ {
+ report_invalid_record(state, "contrecord is requested by %X/%X",
+ (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
+ goto err;
+ }
- if (readOff < pageHeaderSize)
- readOff = ReadPageInternal(state, targetPagePtr,
- pageHeaderSize);
+ /*
+ * The page header looks OK, and targetRecOff points to the
+ * true beginning of the record now. Read the record length,
+ * which is the first field in the record header. The record
+ * must have at least the header, so request SizeOfXLogRecord
+ * or as much of it as fits on this page.
+ */
+ if (XLogNeedData(state, targetPagePtr,
+ Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ)))
+ return XLREAD_NEED_DATA;
- Assert(pageHeaderSize <= readOff);
+ /*
+ * Read the record length.
+ *
+ * NB: Even though we use an XLogRecord pointer here, the whole record
+ * header might not fit on this page. xl_tot_len is the first field of the
+ * struct, so it must be on this page (the records are MAXALIGNed), but we
+ * cannot access any other fields until we've verified that we got the
+ * whole header.
+ */
+ rechdr = (XLogRecord *) (state->readBuf + targetRecOff);
+ total_len = rechdr->xl_tot_len;
+
+ /*
+ * If the whole record header is on this page, validate it immediately.
+ * Otherwise do just a basic sanity check on xl_tot_len, and validate the
+ * rest of the header after reading it from the next page. The xl_tot_len
+ * check is necessary here to ensure that we enter the "Need to reassemble
+ * record" code path below; otherwise we might fail to apply
+ * ValidXLogRecordHeader at all.
+ */
+ if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord)
+ {
+ if (!ValidXLogRecordHeader(state, state->ReadRecPtr, state->PrevRecPtr, rechdr))
+ goto err;
+ }
+ else
+ {
+ /* XXX: more validation should be done here */
+ if (total_len < SizeOfXLogRecord)
+ {
+ report_invalid_record(state,
+ "invalid record length at %X/%X: wanted %u, got %u",
+ (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr,
+ (uint32) SizeOfXLogRecord, total_len);
+ goto err;
+ }
+ }
+
+ /*
+ * Wait for the rest of the record, or the part of the record that fit on the
+ * first page if crossed a page boundary, to become available.
+ */
+ state->recordGotLen = 0;
+ state->recordRemainLen = total_len;
- contdata = (char *) state->readBuf + pageHeaderSize;
- len = XLOG_BLCKSZ - pageHeaderSize;
- if (pageHeader->xlp_rem_len < len)
- len = pageHeader->xlp_rem_len;
+ /* Wait for the rest of the record on the first page to become available */
+ pageRemain = XLOG_BLCKSZ - state->ReadRecPtr % XLOG_BLCKSZ;
+ len = Min(pageRemain, total_len);
- if (readOff < pageHeaderSize + len)
- readOff = ReadPageInternal(state, targetPagePtr,
- pageHeaderSize + len);
+ if (XLogNeedData(state, targetPagePtr, targetRecOff + len))
+ return XLREAD_NEED_DATA;
+ rechdr = (XLogRecord *) (state->readBuf + targetRecOff);
- memcpy(buffer, (char *) contdata, len);
- buffer += len;
- gotlen += len;
+ if (state->recordRemainLen <= len)
+ {
+ /* Record does not cross a page boundary */
+ if (!ValidXLogRecord(state, rechdr, state->ReadRecPtr))
+ goto err;
+ /* We already checked the record header earlier */
+ state->EndRecPtr = state->ReadRecPtr + MAXALIGN(total_len);
+ state->readRecordState = XLREAD_NEXT_RECORD;
+ *record = rechdr;
+ break;
+ }
+ else
+ {
+ /*
+ * The record continues on the next page. Need to reassemble it.
+ */
+ /* Enlarge readRecordBuf as needed. */
+ if (total_len > state->readRecordBufSize &&
+ !allocate_recordbuf(state, total_len))
+ {
+ /* We treat this as a "bogus data" condition */
+ report_invalid_record(state, "record length %u at %X/%X too long",
+ total_len,
+ (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
+ goto err;
+ }
+
+ /* Copy the first fragment of the record from the first page. */
+ memcpy(state->readRecordBuf,
+ state->readBuf + state->ReadRecPtr % XLOG_BLCKSZ, len);
+ state->recordGotLen += len;
+ state->recordRemainLen -= len;
+
+ /* Calculate pointer to beginning of next page */
+ state->recordContRecPtr = state->ReadRecPtr + len;
+ Assert(state->recordContRecPtr % XLOG_BLCKSZ == 0);
+
+ state->readRecordState = XLREAD_NEED_CONTINUATION;
+ }
+ }
+ /* fall through */
- /* If we just reassembled the record header, validate it. */
- if (!gotheader)
+ case XLREAD_NEED_CONTINUATION:
{
- record = (XLogRecord *) state->readRecordBuf;
- if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr,
- record, randAccess))
+ XLogRecPtr targetPagePtr;
+ char *contdata;
+ XLogPageHeader pageHeader;
+ uint32 pageHeaderSize;
+ uint32 len;
+
+ /* Wait for the next page to become available */
+ targetPagePtr = state->recordContRecPtr;
+ if (XLogNeedPage(state, targetPagePtr))
+ return XLREAD_NEED_DATA;
+
+ /* Check that the continuation on next page looks valid */
+ pageHeader = (XLogPageHeader) state->readBuf;
+ if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD))
+ {
+ /* XXX: should this report recordContRecPtr rather than beginning of the
+ * record? */
+ report_invalid_record(state,
+ "there is no contrecord flag at %X/%X",
+ (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
goto err;
- gotheader = true;
- }
- } while (gotlen < total_len);
+ }
- Assert(gotheader);
+ /*
+ * Cross-check that xlp_rem_len agrees with how much of the record
+ * we expect there to be left.
+ */
+ if (pageHeader->xlp_rem_len == 0 ||
+ pageHeader->xlp_rem_len != state->recordRemainLen)
+ {
+ /* XXX: should this report recordContRecPtr rather than beginning of the
+ * record? */
+ report_invalid_record(state,
+ "invalid contrecord length %u at %X/%X",
+ pageHeader->xlp_rem_len,
+ (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
+ goto err;
+ }
- record = (XLogRecord *) state->readRecordBuf;
- if (!ValidXLogRecord(state, record, RecPtr))
- goto err;
+ /* Append the continuation from this page to the buffer */
+ pageHeaderSize = XLogPageHeaderSize(pageHeader);
+ /* XLogNeedData should have ensured that the whole page header was read */
+ Assert(pageHeaderSize <= state->readLen);
+ Assert(state->readLen >= SizeOfXLogShortPHD);
- pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
- state->ReadRecPtr = RecPtr;
- state->EndRecPtr = targetPagePtr + pageHeaderSize
- + MAXALIGN(pageHeader->xlp_rem_len);
- }
- else
- {
- /* Wait for the record data to become available */
- readOff = ReadPageInternal(state, targetPagePtr,
- Min(targetRecOff + total_len, XLOG_BLCKSZ));
- if (readOff < 0)
- goto err;
+ /* Ok, read the rest of the record */
+ /*
+ * Read the rest of the page containing the record, if we
+ * didn't get it already. (If we didn't get it yet, we'll
+ * read the page header again on next invocation. In
+ * practice, this should happen very rarely, assuming that
+ * the caller makes the whole page available to us even
+ * when we request just a part of it. XXX: This would also
+ * become less ugly, if we passed a flag to XLogNeedData()
+ * on the first call to tell it that the remaining length
+ * includes the page header)
+ */
+ if (XLogNeedData(state, targetPagePtr,
+ Min(pageHeaderSize + state->recordRemainLen,
+ XLOG_BLCKSZ)))
+ return XLREAD_NEED_DATA;
+
+ contdata = (char *) state->readBuf + pageHeaderSize;
+ len = XLOG_BLCKSZ - pageHeaderSize;
+ if (pageHeader->xlp_rem_len < len)
+ len = pageHeader->xlp_rem_len;
+
+ memcpy(state->readRecordBuf + state->recordGotLen,
+ (char *) contdata, len);
+ state->recordGotLen += len;
+ state->recordRemainLen -= len;
+
+ /* If we just reassembled the record header, validate it. */
+ if (state->recordGotLen >= sizeof(XLogRecord) &&
+ state->recordGotLen - len < sizeof(XLogRecord))
+ {
+ *record = (XLogRecord *) state->readRecordBuf;
+ if (!ValidXLogRecordHeader(state, state->ReadRecPtr, state->PrevRecPtr,
+ *record))
+ goto err;
+ }
- /* Record does not cross a page boundary */
- if (!ValidXLogRecord(state, record, RecPtr))
- goto err;
+ if (state->recordRemainLen > 0)
+ {
+ /* Calculate pointer to beginning of next page, and continue */
+ state->recordContRecPtr = targetPagePtr + XLOG_BLCKSZ;
+ goto again;
+ }
+ else
+ {
+ *record = (XLogRecord *) state->readRecordBuf;
+ if (!ValidXLogRecord(state, *record, state->ReadRecPtr))
+ goto err;
- state->EndRecPtr = RecPtr + MAXALIGN(total_len);
+ pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
+ state->EndRecPtr = targetPagePtr + pageHeaderSize
+ + MAXALIGN(pageHeader->xlp_rem_len);
- state->ReadRecPtr = RecPtr;
+ state->readRecordState = XLREAD_NEXT_RECORD;
+ break;
+ }
+ }
}
/*
* Special processing if it's an XLOG SWITCH record
*/
- if (record->xl_rmid == RM_XLOG_ID &&
- (record->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH)
+ if ((*record)->xl_rmid == RM_XLOG_ID &&
+ ((*record)->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH)
{
/* Pretend it extends to end of segment */
state->EndRecPtr += state->wal_segment_size - 1;
state->EndRecPtr -= XLogSegmentOffset(state->EndRecPtr, state->wal_segment_size);
}
- if (DecodeXLogRecord(state, record, errormsg))
- return record;
- else
- return NULL;
+ state->loadPagePtr = InvalidXLogRecPtr;
+ state->loadLen = 0;
+
+ /* all done. But before we claim victory, see if we were asked to skip over this record */
+ if (state->SkipRecPtr > state->ReadRecPtr)
+ goto again;
+
+ if (DecodeXLogRecord(state, *record, errormsg))
+ return XLREAD_SUCCESS;
+
+ *record = NULL;
+ return XLREAD_FAIL;
err:
@@ -509,124 +804,83 @@ err:
if (state->errormsg_buf[0] != '\0')
*errormsg = state->errormsg_buf;
- return NULL;
+ *record = NULL;
+ return XLREAD_FAIL;
}
/*
- * Read a single xlog page including at least [pageptr, reqLen] of valid data
- * via the read_page() callback.
+ * Check that the current buffer (state->verifiedPagePtr) contains the page
+ * starting at 'pageptr'.
*
- * Returns -1 if the required page cannot be read for some reason; errormsg_buf
- * is set in that case (unless the error occurs in the read_page callback).
+ * If the correct page is loaded, returns false.
*
- * We fetch the page from a reader-local cache if we know we have the required
- * data and if there hasn't been any error since caching the data.
+ * If the required page is not yet loaded, returns false, and sets loadPagePtr
+ * and loadLen to request the caller to provide the data. The caller of
+ * XLogReadRecord() should load the region to state->readBuf and call it again.
*/
-static int
-ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
+static bool
+XLogNeedPage(XLogReaderState *state, XLogRecPtr pageptr)
{
- int readLen;
- uint32 targetPageOff;
- XLogSegNo targetSegNo;
- XLogPageHeader hdr;
-
- Assert((pageptr % XLOG_BLCKSZ) == 0);
-
- XLByteToSeg(pageptr, targetSegNo, state->wal_segment_size);
- targetPageOff = XLogSegmentOffset(pageptr, state->wal_segment_size);
-
- /* check whether we have all the requested data already */
- if (targetSegNo == state->readSegNo && targetPageOff == state->readOff &&
- reqLen <= state->readLen)
- return state->readLen;
-
- /*
- * Data is not in our buffer.
- *
- * Every time we actually read the page, even if we looked at parts of it
- * before, we need to do verification as the read_page callback might now
- * be rereading data from a different source.
- *
- * Whenever switching to a new WAL segment, we read the first page of the
- * file and validate its header, even if that's not where the target
- * record is. This is so that we can check the additional identification
- * info that is present in the first page's "long" header.
- */
- if (targetSegNo != state->readSegNo && targetPageOff != 0)
- {
- XLogRecPtr targetSegmentPtr = pageptr - targetPageOff;
-
- readLen = state->read_page(state, targetSegmentPtr, XLOG_BLCKSZ,
- state->currRecPtr,
- state->readBuf, &state->readPageTLI);
- if (readLen < 0)
- goto err;
-
- /* we can be sure to have enough WAL available, we scrolled back */
- Assert(readLen == XLOG_BLCKSZ);
-
- if (!XLogReaderValidatePageHeader(state, targetSegmentPtr,
- state->readBuf))
- goto err;
- }
-
- /*
- * First, read the requested data length, but at least a short page header
- * so that we can validate it.
- */
- readLen = state->read_page(state, pageptr, Max(reqLen, SizeOfXLogShortPHD),
- state->currRecPtr,
- state->readBuf, &state->readPageTLI);
- if (readLen < 0)
- goto err;
-
- Assert(readLen <= XLOG_BLCKSZ);
-
- /* Do we have enough data to check the header length? */
- if (readLen <= SizeOfXLogShortPHD)
- goto err;
-
- Assert(readLen >= reqLen);
-
- hdr = (XLogPageHeader) state->readBuf;
-
- /* still not enough */
- if (readLen < XLogPageHeaderSize(hdr))
+ /* check whether we have the requested page in the buffer already */
+ if (state->verifiedPagePtr == pageptr && state->verifiedPageLen >= SizeOfXLogShortPHD)
+ return false;
+ else
{
- readLen = state->read_page(state, pageptr, XLogPageHeaderSize(hdr),
- state->currRecPtr,
- state->readBuf, &state->readPageTLI);
- if (readLen < 0)
- goto err;
+ /* request the page header */
+ /*
+ * First, read the requested data length, but at least a short page header
+ * so that we can validate it.
+ */
+ state->verifiedPagePtr = InvalidXLogRecPtr;
+ state->verifiedPageLen = 0;
+ state->loadPagePtr = pageptr;
+ state->loadLen = SizeOfXLogShortPHD;
+ return true;
}
+}
- /*
- * Now that we know we have the full header, validate it.
- */
- if (!XLogReaderValidatePageHeader(state, pageptr, (char *) hdr))
- goto err;
+/*
+ * Checks if the page current loaded into 'readBuf' contains 'reqLen' bytes.
+ *
+ * Returns false if there is enough data. Otherwise, returns true, and sets
+ * loadPagePtr and loadLen to request more.
+ *
+ * Note: This function assumes that you have already called XLogNeedPage()
+ * first, to ensure that the correct page has been loaded into 'readBuf'.
+ */
+static bool
+XLogNeedData(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
+{
+ /* This should only be called after XLogNeedPage has verified the page header */
+ Assert(state->readPagePtr == pageptr);
+ Assert(state->readLen == state->verifiedPageLen);
+ Assert(state->verifiedPagePtr == pageptr);
- /* update read state information */
- state->readSegNo = targetSegNo;
- state->readOff = targetPageOff;
- state->readLen = readLen;
+ Assert(reqLen > XLogPageHeaderSize((XLogPageHeader) state->readBuf));
+ Assert(reqLen <= XLOG_BLCKSZ);
- return readLen;
+ /* check whether we have all the requested data already */
+ if (reqLen <= state->verifiedPageLen)
+ return false;
-err:
- XLogReaderInvalReadState(state);
- return -1;
+ /* force the page headers to be re-verified */
+ state->verifiedPagePtr = InvalidXLogRecPtr;
+ state->verifiedPageLen = 0;
+ state->loadPagePtr = pageptr;
+ state->loadLen = reqLen;
+ return true;
}
/*
* Invalidate the xlogreader's read state to force a re-read.
*/
-void
+static void
XLogReaderInvalReadState(XLogReaderState *state)
{
- state->readSegNo = 0;
- state->readOff = 0;
- state->readLen = 0;
+ state->verifiedPagePtr = InvalidXLogRecPtr;
+ state->verifiedPageLen = 0;
+ state->loadPagePtr = InvalidXLogRecPtr;
+ state->loadLen = 0;
}
/*
@@ -634,11 +888,12 @@ XLogReaderInvalReadState(XLogReaderState *state)
*
* This is just a convenience subroutine to avoid duplicated code in
* XLogReadRecord. It's not intended for use from anywhere else.
+ *
+ * If PrevRecPtr is valid, the xl_prev is is cross-checked with it.
*/
static bool
ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
- XLogRecPtr PrevRecPtr, XLogRecord *record,
- bool randAccess)
+ XLogRecPtr PrevRecPtr, XLogRecord *record)
{
if (record->xl_tot_len < SizeOfXLogRecord)
{
@@ -656,7 +911,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
(uint32) RecPtr);
return false;
}
- if (randAccess)
+ if (PrevRecPtr == InvalidXLogRecPtr)
{
/*
* We can't exactly verify the prev-link, but surely it should be less
@@ -869,134 +1124,6 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
return true;
}
-#ifdef FRONTEND
-/*
- * Functions that are currently not needed in the backend, but are better
- * implemented inside xlogreader.c because of the internal facilities available
- * here.
- */
-
-/*
- * Find the first record with an lsn >= RecPtr.
- *
- * Useful for checking whether RecPtr is a valid xlog address for reading, and
- * to find the first valid address after some address when dumping records for
- * debugging purposes.
- */
-XLogRecPtr
-XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
-{
- XLogReaderState saved_state = *state;
- XLogRecPtr tmpRecPtr;
- XLogRecPtr found = InvalidXLogRecPtr;
- XLogPageHeader header;
- char *errormsg;
-
- Assert(!XLogRecPtrIsInvalid(RecPtr));
-
- /*
- * skip over potential continuation data, keeping in mind that it may span
- * multiple pages
- */
- tmpRecPtr = RecPtr;
- while (true)
- {
- XLogRecPtr targetPagePtr;
- int targetRecOff;
- uint32 pageHeaderSize;
- int readLen;
-
- /*
- * Compute targetRecOff. It should typically be equal or greater than
- * short page-header since a valid record can't start anywhere before
- * that, except when caller has explicitly specified the offset that
- * falls somewhere there or when we are skipping multi-page
- * continuation record. It doesn't matter though because
- * ReadPageInternal() is prepared to handle that and will read at
- * least short page-header worth of data
- */
- targetRecOff = tmpRecPtr % XLOG_BLCKSZ;
-
- /* scroll back to page boundary */
- targetPagePtr = tmpRecPtr - targetRecOff;
-
- /* Read the page containing the record */
- readLen = ReadPageInternal(state, targetPagePtr, targetRecOff);
- if (readLen < 0)
- goto err;
-
- header = (XLogPageHeader) state->readBuf;
-
- pageHeaderSize = XLogPageHeaderSize(header);
-
- /* make sure we have enough data for the page header */
- readLen = ReadPageInternal(state, targetPagePtr, pageHeaderSize);
- if (readLen < 0)
- goto err;
-
- /* skip over potential continuation data */
- if (header->xlp_info & XLP_FIRST_IS_CONTRECORD)
- {
- /*
- * If the length of the remaining continuation data is more than
- * what can fit in this page, the continuation record crosses over
- * this page. Read the next page and try again. xlp_rem_len in the
- * next page header will contain the remaining length of the
- * continuation data
- *
- * Note that record headers are MAXALIGN'ed
- */
- if (MAXALIGN(header->xlp_rem_len) > (XLOG_BLCKSZ - pageHeaderSize))
- tmpRecPtr = targetPagePtr + XLOG_BLCKSZ;
- else
- {
- /*
- * The previous continuation record ends in this page. Set
- * tmpRecPtr to point to the first valid record
- */
- tmpRecPtr = targetPagePtr + pageHeaderSize
- + MAXALIGN(header->xlp_rem_len);
- break;
- }
- }
- else
- {
- tmpRecPtr = targetPagePtr + pageHeaderSize;
- break;
- }
- }
-
- /*
- * we know now that tmpRecPtr is an address pointing to a valid XLogRecord
- * because either we're at the first record after the beginning of a page
- * or we just jumped over the remaining data of a continuation.
- */
- while (XLogReadRecord(state, tmpRecPtr, &errormsg) != NULL)
- {
- /* continue after the record */
- tmpRecPtr = InvalidXLogRecPtr;
-
- /* past the record we've found, break out */
- if (RecPtr <= state->ReadRecPtr)
- {
- found = state->ReadRecPtr;
- goto out;
- }
- }
-
-err:
-out:
- /* Reset state to what we had before finding the record */
- state->ReadRecPtr = saved_state.ReadRecPtr;
- state->EndRecPtr = saved_state.EndRecPtr;
- XLogReaderInvalReadState(state);
-
- return found;
-}
-
-#endif /* FRONTEND */
-
-
/* ----------------------------------------
* Functions for decoding the data and block references in a record.
* ----------------------------------------
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 1fc39333f15..99d407f123b 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -802,8 +802,7 @@ XLogRead(char *buf, int segsize, TimeLineID tli, XLogRecPtr startptr,
void
XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
{
- const XLogRecPtr lastReadPage = state->readSegNo *
- state->wal_segment_size + state->readOff;
+ const XLogRecPtr lastReadPage = state->readPagePtr;
Assert(wantPage != InvalidXLogRecPtr && wantPage % XLOG_BLCKSZ == 0);
Assert(wantLength <= XLOG_BLCKSZ);
@@ -907,11 +906,13 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
* exists for normal backends, so we have to do a check/sleep/repeat style of
* loop for now.
*/
-int
-read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
- int reqLen, XLogRecPtr targetRecPtr, char *cur_page,
- TimeLineID *pageTLI)
+bool
+read_local_xlog_page(XLogReaderState *state)
{
+ XLogRecPtr targetPagePtr = state->loadPagePtr;
+ int reqLen = state->loadLen;
+ char *cur_page = state->readBuf;
+ TimeLineID *pageTLI = &state->readPageTLI;
XLogRecPtr read_upto,
loc;
int count;
@@ -1009,7 +1010,9 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
else if (targetPagePtr + reqLen > read_upto)
{
/* not enough data there */
- return -1;
+ state->readPagePtr = InvalidXLogRecPtr;
+ state->readLen = -1;
+ return false;
}
else
{
@@ -1026,5 +1029,7 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
XLOG_BLCKSZ);
/* number of valid bytes in the buffer */
- return count;
+ state->readPagePtr = targetPagePtr;
+ state->readLen = count;
+ return true;
}
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index f8b9020081e..bff427395a1 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -124,7 +124,7 @@ StartupDecodingContext(List *output_plugin_options,
TransactionId xmin_horizon,
bool need_full_snapshot,
bool fast_forward,
- XLogPageReadCB read_page,
+ LogicalDecodingXLogReadPageCB read_page,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress)
@@ -173,11 +173,13 @@ StartupDecodingContext(List *output_plugin_options,
ctx->slot = slot;
- ctx->reader = XLogReaderAllocate(wal_segment_size, read_page, ctx);
+ ctx->reader = XLogReaderAllocate(wal_segment_size);
if (!ctx->reader)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of memory")));
+ ctx->reader->readBuf = palloc(XLOG_BLCKSZ);
+ ctx->read_page = read_page;
ctx->reorder = ReorderBufferAllocate();
ctx->snapshot_builder =
@@ -232,7 +234,7 @@ CreateInitDecodingContext(char *plugin,
List *output_plugin_options,
bool need_full_snapshot,
XLogRecPtr restart_lsn,
- XLogPageReadCB read_page,
+ LogicalDecodingXLogReadPageCB read_page,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress)
@@ -374,7 +376,7 @@ LogicalDecodingContext *
CreateDecodingContext(XLogRecPtr start_lsn,
List *output_plugin_options,
bool fast_forward,
- XLogPageReadCB read_page,
+ LogicalDecodingXLogReadPageCB read_page,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress)
@@ -465,11 +467,10 @@ DecodingContextReady(LogicalDecodingContext *ctx)
void
DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
{
- XLogRecPtr startptr;
ReplicationSlot *slot = ctx->slot;
/* Initialize from where to start reading WAL. */
- startptr = slot->data.restart_lsn;
+ XLogBeginRead(ctx->reader, slot->data.restart_lsn);
elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%X",
(uint32) (slot->data.restart_lsn >> 32),
@@ -482,14 +483,21 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
char *err = NULL;
/* the read_page callback waits for new WAL */
- record = XLogReadRecord(ctx->reader, startptr, &err);
+ while (XLogReadRecord(ctx->reader, &record, &err) ==
+ XLREAD_NEED_DATA)
+ {
+ if (!ctx->read_page(ctx))
+ {
+ record = NULL;
+ break;
+ }
+ }
+
if (err)
elog(ERROR, "%s", err);
if (!record)
elog(ERROR, "no record found"); /* shouldn't happen */
- startptr = InvalidXLogRecPtr;
-
LogicalDecodingProcessRecord(ctx, ctx->reader);
/* only continue till we found a consistent spot */
@@ -516,6 +524,7 @@ FreeDecodingContext(LogicalDecodingContext *ctx)
ReorderBufferFree(ctx->reorder);
FreeSnapshotBuilder(ctx->snapshot_builder);
+ pfree(ctx->reader->readBuf);
XLogReaderFree(ctx->reader);
MemoryContextDelete(ctx->context);
}
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index d974400d6ef..fccae9a26ed 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -114,12 +114,10 @@ check_permissions(void)
(errmsg("must be superuser or replication role to use replication slots"))));
}
-int
-logical_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
- int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
+bool
+logical_read_local_xlog_page(LogicalDecodingContext *ctx)
{
- return read_local_xlog_page(state, targetPagePtr, reqLen,
- targetRecPtr, cur_page, pageTLI);
+ return read_local_xlog_page(ctx->reader);
}
/*
@@ -135,7 +133,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
MemoryContext per_query_ctx;
MemoryContext oldcontext;
XLogRecPtr end_of_wal;
- XLogRecPtr startptr;
+ XLogRecPtr nextptr;
LogicalDecodingContext *ctx;
ResourceOwner old_resowner = CurrentResourceOwner;
ArrayType *arr;
@@ -277,28 +275,31 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
* xacts that committed after the slot's confirmed_flush can be
* accumulated into reorder buffers.
*/
- startptr = MyReplicationSlot->data.restart_lsn;
+ nextptr = MyReplicationSlot->data.restart_lsn;
+ XLogBeginRead(ctx->reader, nextptr);
/* invalidate non-timetravel entries */
InvalidateSystemCaches();
/* Decode until we run out of records */
- while ((startptr != InvalidXLogRecPtr && startptr < end_of_wal) ||
- (ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < end_of_wal))
+ while (nextptr < end_of_wal)
{
XLogRecord *record;
char *errm = NULL;
- record = XLogReadRecord(ctx->reader, startptr, &errm);
+ while (XLogReadRecord(ctx->reader, &record, &errm) ==
+ XLREAD_NEED_DATA)
+ {
+ if (!ctx->read_page(ctx))
+ {
+ record = NULL;
+ break;
+ }
+ }
+
if (errm)
elog(ERROR, "%s", errm);
- /*
- * Now that we've set up the xlog reader state, subsequent calls
- * pass InvalidXLogRecPtr to say "continue from last record"
- */
- startptr = InvalidXLogRecPtr;
-
/*
* The {begin_txn,change,commit_txn}_wrapper callbacks above will
* store the description into our tuplestore.
@@ -313,6 +314,9 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
if (upto_nchanges != 0 &&
upto_nchanges <= p->returned_rows)
break;
+
+ nextptr = ctx->reader->EndRecPtr;
+
CHECK_FOR_INTERRUPTS();
}
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 808a6f5b836..0f40215fc12 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -392,7 +392,7 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
{
LogicalDecodingContext *ctx;
ResourceOwner old_resowner = CurrentResourceOwner;
- XLogRecPtr startlsn;
+ XLogRecPtr nextlsn;
XLogRecPtr retlsn;
PG_TRY();
@@ -412,7 +412,8 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
* Start reading at the slot's restart_lsn, which we know to point to
* a valid record.
*/
- startlsn = MyReplicationSlot->data.restart_lsn;
+ nextlsn = MyReplicationSlot->data.restart_lsn;
+ XLogBeginRead(ctx->reader, nextlsn);
/* Initialize our return value in case we don't do anything */
retlsn = MyReplicationSlot->data.confirmed_flush;
@@ -421,10 +422,7 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
InvalidateSystemCaches();
/* Decode at least one record, until we run out of records */
- while ((!XLogRecPtrIsInvalid(startlsn) &&
- startlsn < moveto) ||
- (!XLogRecPtrIsInvalid(ctx->reader->EndRecPtr) &&
- ctx->reader->EndRecPtr < moveto))
+ while (nextlsn < moveto)
{
char *errm = NULL;
XLogRecord *record;
@@ -433,13 +431,19 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
* Read records. No changes are generated in fast_forward mode,
* but snapbuilder/slot statuses are updated properly.
*/
- record = XLogReadRecord(ctx->reader, startlsn, &errm);
+ while (XLogReadRecord(ctx->reader, &record, &errm) ==
+ XLREAD_NEED_DATA)
+ {
+ if (!ctx->read_page(ctx))
+ {
+ record = NULL;
+ break;
+ }
+ }
+
if (errm)
elog(ERROR, "%s", errm);
- /* Read sequentially from now on */
- startlsn = InvalidXLogRecPtr;
-
/*
* Process the record. Storage-level changes are ignored in
* fast_forward mode, but other modules (such as snapbuilder)
@@ -452,6 +456,7 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
if (moveto <= ctx->reader->EndRecPtr)
break;
+ nextlsn = ctx->reader->EndRecPtr;
CHECK_FOR_INTERRUPTS();
}
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 23870a25a56..8141d938ed2 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -761,10 +761,14 @@ StartReplication(StartReplicationCmd *cmd)
* which has to do a plain sleep/busy loop, because the walsender's latch gets
* set every time WAL is flushed.
*/
-static int
-logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
- XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
+static bool
+logical_read_xlog_page(LogicalDecodingContext *ctx)
{
+ XLogReaderState *state = ctx->reader;
+ XLogRecPtr targetPagePtr = state->loadPagePtr;
+ int reqLen = state->loadLen;
+ char *cur_page = state->readBuf;
+
XLogRecPtr flushptr;
int count;
@@ -779,7 +783,11 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
/* fail if not (implies we are going to shut down) */
if (flushptr < targetPagePtr + reqLen)
- return -1;
+ {
+ state->readPagePtr = InvalidXLogRecPtr;
+ state->readLen = -1;
+ return false;
+ }
if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
count = XLOG_BLCKSZ; /* more than one block available */
@@ -789,7 +797,9 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
/* now actually read the data, we know it's there */
XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ);
- return count;
+ state->readPagePtr = targetPagePtr;
+ state->readLen = count;
+ return true;
}
/*
@@ -2823,8 +2833,21 @@ XLogSendLogical(void)
*/
WalSndCaughtUp = false;
- record = XLogReadRecord(logical_decoding_ctx->reader, logical_startptr, &errm);
- logical_startptr = InvalidXLogRecPtr;
+ if (logical_startptr != InvalidXLogRecPtr)
+ {
+ XLogBeginRead(logical_decoding_ctx->reader, logical_startptr);
+ logical_startptr = InvalidXLogRecPtr;
+ }
+
+ while (XLogReadRecord(logical_decoding_ctx->reader,
+ &record, &errm) == XLREAD_NEED_DATA)
+ {
+ if (!logical_decoding_ctx->read_page(logical_decoding_ctx))
+ {
+ record = NULL;
+ break;
+ }
+ }
/* xlog record was invalid */
if (errm != NULL)
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index 63c3879ead8..41709cba7a5 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -41,16 +41,8 @@ static int xlogreadfd = -1;
static XLogSegNo xlogreadsegno = -1;
static char xlogfpath[MAXPGPATH];
-typedef struct XLogPageReadPrivate
-{
- const char *datadir;
- int tliIndex;
-} XLogPageReadPrivate;
-
-static int SimpleXLogPageRead(XLogReaderState *xlogreader,
- XLogRecPtr targetPagePtr,
- int reqLen, XLogRecPtr targetRecPtr, char *readBuf,
- TimeLineID *pageTLI);
+static void SimpleXLogPageRead(XLogReaderState *xlogreader,
+ const char *datadir, int *tliIndex);
/*
* Read WAL from the datadir/pg_wal, starting from 'startpoint' on timeline
@@ -64,24 +56,25 @@ extractPageMap(const char *datadir, XLogRecPtr startpoint, int tliIndex,
XLogRecord *record;
XLogReaderState *xlogreader;
char *errormsg;
- XLogPageReadPrivate private;
- private.datadir = datadir;
- private.tliIndex = tliIndex;
- xlogreader = XLogReaderAllocate(WalSegSz, &SimpleXLogPageRead,
- &private);
+ xlogreader = XLogReaderAllocate(WalSegSz);
if (xlogreader == NULL)
pg_fatal("out of memory");
+ xlogreader->readBuf = pg_malloc(XLOG_BLCKSZ);
+
+ XLogBeginRead(xlogreader, startpoint);
do
{
- record = XLogReadRecord(xlogreader, startpoint, &errormsg);
+ while (XLogReadRecord(xlogreader, &record, &errormsg) ==
+ XLREAD_NEED_DATA)
+ {
+ SimpleXLogPageRead(xlogreader, datadir, &tliIndex);
+ }
if (record == NULL)
{
- XLogRecPtr errptr;
-
- errptr = startpoint ? startpoint : xlogreader->EndRecPtr;
+ XLogRecPtr errptr = xlogreader->EndRecPtr;
if (errormsg)
pg_fatal("could not read WAL record at %X/%X: %s",
@@ -93,11 +86,9 @@ extractPageMap(const char *datadir, XLogRecPtr startpoint, int tliIndex,
}
extractPageInfo(xlogreader);
-
- startpoint = InvalidXLogRecPtr; /* continue reading at next record */
-
} while (xlogreader->ReadRecPtr != endpoint);
+ pg_free(xlogreader->readBuf);
XLogReaderFree(xlogreader);
if (xlogreadfd != -1)
{
@@ -116,17 +107,21 @@ readOneRecord(const char *datadir, XLogRecPtr ptr, int tliIndex)
XLogRecord *record;
XLogReaderState *xlogreader;
char *errormsg;
- XLogPageReadPrivate private;
XLogRecPtr endptr;
- private.datadir = datadir;
- private.tliIndex = tliIndex;
- xlogreader = XLogReaderAllocate(WalSegSz, &SimpleXLogPageRead,
- &private);
+ xlogreader = XLogReaderAllocate(WalSegSz);
if (xlogreader == NULL)
pg_fatal("out of memory");
+ xlogreader->readBuf = pg_malloc(XLOG_BLCKSZ);
+
+ XLogBeginRead(xlogreader, ptr);
+
+ while (XLogReadRecord(xlogreader, &record, &errormsg) ==
+ XLREAD_NEED_DATA)
+ {
+ SimpleXLogPageRead(xlogreader, datadir, &tliIndex);
+ }
- record = XLogReadRecord(xlogreader, ptr, &errormsg);
if (record == NULL)
{
if (errormsg)
@@ -138,6 +133,7 @@ readOneRecord(const char *datadir, XLogRecPtr ptr, int tliIndex)
}
endptr = xlogreader->EndRecPtr;
+ pg_free(xlogreader->readBuf);
XLogReaderFree(xlogreader);
if (xlogreadfd != -1)
{
@@ -161,7 +157,6 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex,
XLogRecPtr searchptr;
XLogReaderState *xlogreader;
char *errormsg;
- XLogPageReadPrivate private;
/*
* The given fork pointer points to the end of the last common record,
@@ -177,19 +172,22 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex,
forkptr += SizeOfXLogShortPHD;
}
- private.datadir = datadir;
- private.tliIndex = tliIndex;
- xlogreader = XLogReaderAllocate(WalSegSz, &SimpleXLogPageRead,
- &private);
+ xlogreader = XLogReaderAllocate(WalSegSz);
if (xlogreader == NULL)
pg_fatal("out of memory");
+ xlogreader->readBuf = pg_malloc(XLOG_BLCKSZ);
searchptr = forkptr;
for (;;)
{
uint8 info;
- record = XLogReadRecord(xlogreader, searchptr, &errormsg);
+ XLogBeginRead(xlogreader, searchptr);
+ while (XLogReadRecord(xlogreader, &record, &errormsg) ==
+ XLREAD_NEED_DATA)
+ {
+ SimpleXLogPageRead(xlogreader, datadir, &tliIndex);
+ }
if (record == NULL)
{
@@ -226,6 +224,7 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex,
searchptr = record->xl_prev;
}
+ pg_free(xlogreader->readBuf);
XLogReaderFree(xlogreader);
if (xlogreadfd != -1)
{
@@ -234,13 +233,14 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex,
}
}
-/* XLogReader callback function, to read a WAL page */
-static int
-SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
- int reqLen, XLogRecPtr targetRecPtr, char *readBuf,
- TimeLineID *pageTLI)
+/* XLogreader callback function, to read a WAL page */
+static void
+SimpleXLogPageRead(XLogReaderState *xlogreader,
+ const char*datadir, int *tliIndex)
{
- XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data;
+ XLogRecPtr targetPagePtr = xlogreader->loadPagePtr;
+ char *readBuf = xlogreader->readBuf;
+ TimeLineID *pageTLI = &xlogreader->readPageTLI;
uint32 targetPageOff;
XLogRecPtr targetSegEnd;
XLogSegNo targetSegNo;
@@ -273,24 +273,26 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
* be done both forward and backward, consider also switching timeline
* accordingly.
*/
- while (private->tliIndex < targetNentries - 1 &&
- targetHistory[private->tliIndex].end < targetSegEnd)
- private->tliIndex++;
- while (private->tliIndex > 0 &&
- targetHistory[private->tliIndex].begin >= targetSegEnd)
- private->tliIndex--;
-
- XLogFileName(xlogfname, targetHistory[private->tliIndex].tli,
+ while (*tliIndex < targetNentries - 1 &&
+ targetHistory[*tliIndex].end < targetSegEnd)
+ (*tliIndex)++;
+ while (*tliIndex > 0 &&
+ targetHistory[*tliIndex].begin >= targetSegEnd)
+ (*tliIndex)--;
+
+ XLogFileName(xlogfname, targetHistory[*tliIndex].tli,
xlogreadsegno, WalSegSz);
- snprintf(xlogfpath, MAXPGPATH, "%s/" XLOGDIR "/%s", private->datadir, xlogfname);
+ snprintf(xlogfpath, MAXPGPATH, "%s/" XLOGDIR "/%s", datadir, xlogfname);
xlogreadfd = open(xlogfpath, O_RDONLY | PG_BINARY, 0);
if (xlogreadfd < 0)
{
pg_log_error("could not open file \"%s\": %m", xlogfpath);
- return -1;
+ xlogreader->readPagePtr = InvalidXLogRecPtr;
+ xlogreader->readLen = -1;
+ return;
}
}
@@ -303,7 +305,9 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
if (lseek(xlogreadfd, (off_t) targetPageOff, SEEK_SET) < 0)
{
pg_log_error("could not seek in file \"%s\": %m", xlogfpath);
- return -1;
+ xlogreader->readPagePtr = InvalidXLogRecPtr;
+ xlogreader->readLen = -1;
+ return;
}
@@ -316,13 +320,18 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
pg_log_error("could not read file \"%s\": read %d of %zu",
xlogfpath, r, (Size) XLOG_BLCKSZ);
- return -1;
+ xlogreader->readPagePtr = InvalidXLogRecPtr;
+ xlogreader->readLen = -1;
+ return;
}
Assert(targetSegNo == xlogreadsegno);
- *pageTLI = targetHistory[private->tliIndex].tli;
- return XLOG_BLCKSZ;
+ *pageTLI = targetHistory[*tliIndex].tli;
+
+ xlogreader->readPagePtr = targetPagePtr;
+ xlogreader->readLen = XLOG_BLCKSZ;
+ return;
}
/*
diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c
index b95d467805a..c82c39e7147 100644
--- a/src/bin/pg_waldump/pg_waldump.c
+++ b/src/bin/pg_waldump/pg_waldump.c
@@ -419,13 +419,14 @@ XLogDumpXLogRead(const char *directory, TimeLineID timeline_id,
}
/*
- * XLogReader read_page callback
+ * Read the page requested by XLogReadReacord into state->readBuf.
*/
-static int
-XLogDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
- XLogRecPtr targetPtr, char *readBuff, TimeLineID *curFileTLI)
+static void
+XLogDumpReadPage(XLogReaderState *state, XLogDumpPrivate *private)
{
- XLogDumpPrivate *private = state->private_data;
+ XLogRecPtr targetPagePtr = state->loadPagePtr;
+ int reqLen = state->loadLen;
+ char *readBuff = state->readBuf;
int count = XLOG_BLCKSZ;
if (private->endptr != InvalidXLogRecPtr)
@@ -437,14 +438,18 @@ XLogDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
else
{
private->endptr_reached = true;
- return -1;
+ state->readPagePtr = InvalidXLogRecPtr;
+ state->readLen = -1;
+ return;
}
}
XLogDumpXLogRead(private->inpath, private->timeline, targetPagePtr,
readBuff, count);
- return count;
+ state->readPagePtr = targetPagePtr;
+ state->readLen = count;
+ return;
}
/*
@@ -1100,37 +1105,23 @@ main(int argc, char **argv)
/* done with argument parsing, do the actual work */
/* we have everything we need, start reading */
- xlogreader_state = XLogReaderAllocate(WalSegSz, XLogDumpReadPage,
- &private);
+ xlogreader_state = XLogReaderAllocate(WalSegSz);
if (!xlogreader_state)
fatal_error("out of memory");
+ xlogreader_state->readBuf = palloc(XLOG_BLCKSZ);
/* first find a valid recptr to start from */
- first_record = XLogFindNextRecord(xlogreader_state, private.startptr);
-
- if (first_record == InvalidXLogRecPtr)
- fatal_error("could not find a valid record after %X/%X",
- (uint32) (private.startptr >> 32),
- (uint32) private.startptr);
-
- /*
- * Display a message that we're skipping data if `from` wasn't a pointer
- * to the start of a record and also wasn't a pointer to the beginning of
- * a segment (e.g. we were used in file mode).
- */
- if (first_record != private.startptr &&
- XLogSegmentOffset(private.startptr, WalSegSz) != 0)
- printf(ngettext("first record is after %X/%X, at %X/%X, skipping over %u byte\n",
- "first record is after %X/%X, at %X/%X, skipping over %u bytes\n",
- (first_record - private.startptr)),
- (uint32) (private.startptr >> 32), (uint32) private.startptr,
- (uint32) (first_record >> 32), (uint32) first_record,
- (uint32) (first_record - private.startptr));
+ XLogFindNextRecord(xlogreader_state, private.startptr);
for (;;)
{
/* try to read the next record */
- record = XLogReadRecord(xlogreader_state, first_record, &errormsg);
+ while (XLogReadRecord(xlogreader_state, &record, &errormsg) ==
+ XLREAD_NEED_DATA)
+ {
+ XLogDumpReadPage(xlogreader_state, &private);
+ }
+
if (!record)
{
if (!config.follow || private.endptr_reached)
@@ -1142,8 +1133,23 @@ main(int argc, char **argv)
}
}
- /* after reading the first record, continue at next one */
- first_record = InvalidXLogRecPtr;
+ if (first_record == InvalidXLogRecPtr)
+ {
+ /*
+ * Display a message that we're skipping data if `from` wasn't a pointer
+ * to the start of a record and also wasn't a pointer to the beginning of
+ * a segment (e.g. we were used in file mode).
+ */
+ first_record = xlogreader_state->ReadRecPtr;
+ if (first_record != private.startptr &&
+ XLogSegmentOffset(private.startptr, WalSegSz) != 0)
+ printf(ngettext("first record is after %X/%X, at %X/%X, skipping over %u byte\n",
+ "first record is after %X/%X, at %X/%X, skipping over %u bytes\n",
+ (first_record - private.startptr)),
+ (uint32) (private.startptr >> 32), (uint32) private.startptr,
+ (uint32) (first_record >> 32), (uint32) first_record,
+ (uint32) (first_record - private.startptr));
+ }
/* apply all specified filters */
if (config.filter_by_rmgr != -1 &&
@@ -1176,6 +1182,7 @@ main(int argc, char **argv)
(uint32) xlogreader_state->ReadRecPtr,
errormsg);
+ pfree(xlogreader_state->readBuf);
XLogReaderFree(xlogreader_state);
return EXIT_SUCCESS;
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index aa9bc637259..5e472893e6b 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -9,17 +9,60 @@
* src/include/access/xlogreader.h
*
* NOTES
- * See the definition of the XLogReaderState struct for instructions on
- * how to use the XLogReader infrastructure.
*
- * The basic idea is to allocate an XLogReaderState via
- * XLogReaderAllocate(), and call XLogReadRecord() until it returns NULL.
+ * Usage
*
- * After reading a record with XLogReadRecord(), it's decomposed into
- * the per-block and main data parts, and the parts can be accessed
- * with the XLogRec* macros and functions. You can also decode a
- * record that's already constructed in memory, without reading from
- * disk, by calling the DecodeXLogRecord() function.
+ * 1. Allocate a new xlogreader with XLogReaderAllocate().
+ * 2. Position the reader to the desired starting point with XLogBeginRead() or
+ * XLogFindNextRecord()
+ * 3. Call XLogReadRecord(). Whenever it returns XL_NEED_DATA, provide the
+ * requested data in readBuf, and call it again.
+ *
+ * xlogreader = XLogReaderAllocate();
+ * if (!xlogreader)
+ * elog(ERROR, "out of memory");
+ *
+ * XLogBeginRead(xlogreader, starting_point);
+ * for (;;)
+ * {
+ * XLogReadRecordResult rc;
+ * XLogRecord record;
+ * char *errormsg;
+ *
+ * rc = XLogReadRecord(xlogreader, &record, &errormsg);
+ * if (rc == XLREAD_NEED_DATA)
+ * {
+ * ... Load the data requested by xlogreader->loadPagePtr and loadLen
+ * ... into readBuf and readLen
+ * xlogreader->readBuf = <buffer>
+ * xlogreader->readPagePtr = xxx;
+ * xlogreader->readLen = xxx;
+ * xlogreader->readTLI = xxx;
+ * continue;
+ * }
+ *
+ * if (rc == XLREAD_FAIL)
+ * ereport(ERROR, "read failed: %s", errormsg);
+ *
+ * Assert (rc == XLREAD_SUCCESS);
+ *
+ * ... process the record, using XLogRec* macros ...
+ * }
+ *
+ *
+ * It is up to the caller to manage the read buffer, at 'readBuf'. A common
+ * strategy is to allocate a buffer of size XLOG_BLCKSZ at the same time the
+ * xlogreader is allocated, and read the requested data into the same buffer.
+ * However, XLogReadRecord() assumes that it does not change between calls,
+ * unless XLogReadRecord() asks for a new page by returning XLREAD_NEED_DATA,
+ * or the caller resets the read position by calling XLogBeginRead().
+ *
+ *
+ * When XLogReadRecord() finishes reading a record, it returns XLREAD_SUCCESS.
+ * The record decomposed into the per-block and main data parts, and the parts
+ * can be accessed with the XLogRec* macros and functions. You can also decode
+ * a record that's already constructed in memory, without reading from disk,
+ * by calling the DecodeXLogRecord() function.
*-------------------------------------------------------------------------
*/
#ifndef XLOGREADER_H
@@ -33,14 +76,6 @@
typedef struct XLogReaderState XLogReaderState;
-/* Function type definition for the read_page callback */
-typedef int (*XLogPageReadCB) (XLogReaderState *xlogreader,
- XLogRecPtr targetPagePtr,
- int reqLen,
- XLogRecPtr targetRecPtr,
- char *readBuf,
- TimeLineID *pageTLI);
-
typedef struct
{
/* Is this block ref in use? */
@@ -82,29 +117,6 @@ struct XLogReaderState
*/
int wal_segment_size;
- /*
- * Data input callback (mandatory).
- *
- * This callback shall read at least reqLen valid bytes of the xlog page
- * starting at targetPagePtr, and store them in readBuf. The callback
- * shall return the number of bytes read (never more than XLOG_BLCKSZ), or
- * -1 on failure. The callback shall sleep, if necessary, to wait for the
- * requested bytes to become available. The callback will not be invoked
- * again for the same page unless more than the returned number of bytes
- * are needed.
- *
- * targetRecPtr is the position of the WAL record we're reading. Usually
- * it is equal to targetPagePtr + reqLen, but sometimes xlogreader needs
- * to read and verify the page or segment header, before it reads the
- * actual WAL record it's interested in. In that case, targetRecPtr can
- * be used to determine which timeline to read the page from.
- *
- * The callback shall set *pageTLI to the TLI of the file the page was
- * read from. It is currently used only for error reporting purposes, to
- * reconstruct the name of the WAL file where an error occurred.
- */
- XLogPageReadCB read_page;
-
/*
* System identifier of the xlog files we're about to read. Set to zero
* (the default value) if unknown or unimportant.
@@ -112,23 +124,45 @@ struct XLogReaderState
uint64 system_identifier;
/*
- * Opaque data for callbacks to use. Not used by XLogReader.
+ * Start and end point of last record read. (Use XLogBeginRead() to set these)
*/
- void *private_data;
+ XLogRecPtr ReadRecPtr; /* start of last record read or being read */
+ XLogRecPtr EndRecPtr; /* end+1 of last record read */
+ XLogRecPtr PrevRecPtr; /* start of previous record read */
/*
- * Start and end point of last record read. EndRecPtr is also used as the
- * position to read next, if XLogReadRecord receives an invalid recptr.
+ * If we are skipping records to find the starting point
+ * (because XLogFindNextRecord() was called), SkipRecPtr is the point to
+ * skip to.
*/
- XLogRecPtr ReadRecPtr; /* start of last record read */
- XLogRecPtr EndRecPtr; /* end+1 of last record read */
+ XLogRecPtr SkipRecPtr;
+
+ /* ----------------------------------------
+ * Communication with page reader.
+ *
+ * When XLogReadRecord() returns XLREAD_NEED_DATA, the caller must provide
+ * the data indicated by loadPagePtr/loadLen, before calling XLogReadRecord()
+ * again. To provide the data, set the readPagePtr, readLen, readPageTLI and
+ * readBuf fields.
+ * ----------------------------------------
+ */
+ /* parameters to page reader */
+ XLogRecPtr loadPagePtr; /* Pointer to the page */
+ int loadLen; /* wanted length in bytes */
+ /* return from page reader */
+ XLogRecPtr readPagePtr;
+ int32 readLen; /* bytes actually read, must be at least
+ * loadLen. -1 on error. */
+ TimeLineID readPageTLI; /* TLI for data currently in readBuf */
+ char *readBuf; /* buffer to store data */
/* ----------------------------------------
* Decoded representation of current record
*
* Use XLogRecGet* functions to investigate the record; these fields
- * should not be accessed directly.
+ * should not be accessed directly. The current record is valid after
+ * XLogReadRecord() returns XLREAD_SUCCESS, until the next call.
* ----------------------------------------
*/
XLogRecord *decoded_record; /* currently decoded record */
@@ -149,17 +183,12 @@ struct XLogReaderState
* ----------------------------------------
*/
- /*
- * Buffer for currently read page (XLOG_BLCKSZ bytes, valid up to at least
- * readLen bytes)
- */
- char *readBuf;
- uint32 readLen;
+ /* state of the XLogReadRecord state machine */
+ int readRecordState; /* enum */
- /* last read segment, segment offset, TLI for data currently in readBuf */
- XLogSegNo readSegNo;
- uint32 readOff;
- TimeLineID readPageTLI;
+ /* last read segment and segment offset for data currently in readBuf */
+ XLogRecPtr verifiedPagePtr;
+ uint32 verifiedPageLen;
/*
* beginning of prior page read, and its TLI. Doesn't necessarily
@@ -168,8 +197,30 @@ struct XLogReaderState
XLogRecPtr latestPagePtr;
TimeLineID latestPageTLI;
- /* beginning of the WAL record being read. */
- XLogRecPtr currRecPtr;
+ /*
+ * Buffer for current ReadRecord result (expandable), used when a record
+ * crosses a page boundary.
+ */
+ char *readRecordBuf;
+ uint32 readRecordBufSize;
+
+ int recordGotLen; /* amount of current record that has already been read */
+ int recordRemainLen; /* length of current record that remains */
+ XLogRecPtr recordContRecPtr; /* where the current record continues */
+
+ /* Buffer to hold error message */
+ char *errormsg_buf;
+
+
+
+ /* ----------------------------------------
+ * private fields used by xlogutils.c
+ *
+ * XXX: These are not used by xlogreader itself. They don't really belong
+ * here..
+ * ----------------------------------------
+ */
+
/* timeline to read it from, 0 if a lookup is required */
TimeLineID currTLI;
@@ -188,41 +239,37 @@ struct XLogReaderState
* read from when currTLIValidUntil is reached.
*/
TimeLineID nextTLI;
-
- /*
- * Buffer for current ReadRecord result (expandable), used when a record
- * crosses a page boundary.
- */
- char *readRecordBuf;
- uint32 readRecordBufSize;
-
- /* Buffer to hold error message */
- char *errormsg_buf;
};
/* Get a new XLogReader */
-extern XLogReaderState *XLogReaderAllocate(int wal_segment_size,
- XLogPageReadCB pagereadfunc,
- void *private_data);
+extern XLogReaderState *XLogReaderAllocate(int wal_segment_size);
/* Free an XLogReader */
extern void XLogReaderFree(XLogReaderState *state);
-/* Read the next XLog record. Returns NULL on end-of-WAL or failure */
-extern struct XLogRecord *XLogReadRecord(XLogReaderState *state,
- XLogRecPtr recptr, char **errormsg);
+/* Position the XLogReader to given record */
+extern void XLogBeginRead(XLogReaderState *state, XLogRecPtr recptr);
+#ifdef FRONTEND
+extern void XLogFindNextRecord(XLogReaderState *state, XLogRecPtr recptr);
+#endif /* FRONTEND */
+
+/* Return code from XLogReadRecord */
+typedef enum XLogReadRecordResult
+{
+ XLREAD_SUCCESS, /* record is successfully read */
+ XLREAD_NEED_DATA, /* need more data. see XLogReadRecord. */
+ XLREAD_FAIL /* failed during reading a record */
+} XLogReadRecordResult;
+
+/* Read the next XLog record. */
+extern XLogReadRecordResult XLogReadRecord(XLogReaderState *state,
+ XLogRecord **record,
+ char **errormsg);
/* Validate a page */
extern bool XLogReaderValidatePageHeader(XLogReaderState *state,
XLogRecPtr recptr, char *phdr);
-/* Invalidate read state */
-extern void XLogReaderInvalReadState(XLogReaderState *state);
-
-#ifdef FRONTEND
-extern XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr);
-#endif /* FRONTEND */
-
/* Functions for decoding an XLogRecord */
extern bool DecodeXLogRecord(XLogReaderState *state, XLogRecord *record,
diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h
index 4105b59904b..55a9b6237ab 100644
--- a/src/include/access/xlogutils.h
+++ b/src/include/access/xlogutils.h
@@ -47,10 +47,7 @@ extern Buffer XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
extern Relation CreateFakeRelcacheEntry(RelFileNode rnode);
extern void FreeFakeRelcacheEntry(Relation fakerel);
-extern int read_local_xlog_page(XLogReaderState *state,
- XLogRecPtr targetPagePtr, int reqLen,
- XLogRecPtr targetRecPtr, char *cur_page,
- TimeLineID *pageTLI);
+extern bool read_local_xlog_page(XLogReaderState *state);
extern void XLogReadDetermineTimeline(XLogReaderState *state,
XLogRecPtr wantPage, uint32 wantLength);
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 31c796b7651..befe8ebbeb5 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -30,7 +30,11 @@ typedef void (*LogicalOutputPluginWriterUpdateProgress) (struct LogicalDecodingC
TransactionId xid
);
-typedef struct LogicalDecodingContext
+typedef struct LogicalDecodingContext LogicalDecodingContext;
+
+typedef bool (*LogicalDecodingXLogReadPageCB)(LogicalDecodingContext *ctx);
+
+struct LogicalDecodingContext
{
/* memory context this is all allocated in */
MemoryContext context;
@@ -40,6 +44,7 @@ typedef struct LogicalDecodingContext
/* infrastructure pieces for decoding */
XLogReaderState *reader;
+ LogicalDecodingXLogReadPageCB read_page;
struct ReorderBuffer *reorder;
struct SnapBuild *snapshot_builder;
@@ -87,7 +92,7 @@ typedef struct LogicalDecodingContext
bool prepared_write;
XLogRecPtr write_location;
TransactionId write_xid;
-} LogicalDecodingContext;
+};
extern void CheckLogicalDecodingRequirements(void);
@@ -96,14 +101,14 @@ extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin,
List *output_plugin_options,
bool need_full_snapshot,
XLogRecPtr restart_lsn,
- XLogPageReadCB read_page,
+ LogicalDecodingXLogReadPageCB read_page,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress);
extern LogicalDecodingContext *CreateDecodingContext(XLogRecPtr start_lsn,
List *output_plugin_options,
bool fast_forward,
- XLogPageReadCB read_page,
+ LogicalDecodingXLogReadPageCB read_page,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress);
diff --git a/src/include/replication/logicalfuncs.h b/src/include/replication/logicalfuncs.h
index a9c178a9e68..25fa68d5b93 100644
--- a/src/include/replication/logicalfuncs.h
+++ b/src/include/replication/logicalfuncs.h
@@ -11,9 +11,6 @@
#include "replication/logical.h"
-extern int logical_read_local_xlog_page(XLogReaderState *state,
- XLogRecPtr targetPagePtr,
- int reqLen, XLogRecPtr targetRecPtr,
- char *cur_page, TimeLineID *pageTLI);
+extern bool logical_read_local_xlog_page(LogicalDecodingContext *ctx);
#endif